Skip to main content
Glama
explorer.py14.7 kB
""" Spellbook Explorer - Parses dbt models from Spellbook GitHub repository. This adapter clones or accesses the Spellbook GitHub repo and parses dbt models to discover available tables, schemas, and column information. """ from __future__ import annotations import os import re import subprocess import tempfile from collections.abc import Sequence from pathlib import Path from typing import Any from ...core.models import SchemaMatch, TableColumn, TableDescription, TableSummary from ...core.ports import CatalogExplorer class SpellbookExplorer(CatalogExplorer): """ Explorer that parses Spellbook dbt models from GitHub repository. Spellbook repo: https://github.com/duneanalytics/spellbook """ SPELLBOOK_REPO_URL = "https://github.com/duneanalytics/spellbook.git" DEFAULT_BRANCH = "main" def __init__( self, repo_path: Path | str | None = None, repo_url: str | None = None, branch: str | None = None, ): """ Initialize Spellbook explorer. Args: repo_path: Local path to spellbook repo (if None, will clone to temp dir) repo_url: GitHub repo URL (defaults to official spellbook repo) branch: Git branch to use (defaults to 'main') """ self.repo_url = repo_url or self.SPELLBOOK_REPO_URL self.branch = branch or self.DEFAULT_BRANCH self._repo_path: Path | None = None self._models_cache: dict[str, list[dict[str, Any]]] | None = None if repo_path: self._repo_path = Path(repo_path) else: # Use cache directory if available, otherwise temp cache_base = os.getenv("SPICE_SPELLBOOK_CACHE", tempfile.gettempdir()) self._repo_path = Path(cache_base) / "spellbook_repo" def _ensure_repo(self) -> Path: """Ensure spellbook repo is cloned locally.""" if self._repo_path is None: raise RuntimeError("Repository path not set") repo_path = self._repo_path # Clone if doesn't exist if not repo_path.exists() or not (repo_path / ".git").exists(): repo_path.parent.mkdir(parents=True, exist_ok=True) subprocess.run( ["git", "clone", "--depth", "1", "--branch", self.branch, self.repo_url, str(repo_path)], check=True, capture_output=True, ) else: # Update if exists try: subprocess.run( ["git", "-C", str(repo_path), "pull", "origin", self.branch], check=False, capture_output=True, timeout=30, ) except Exception: pass # Ignore update failures return repo_path def _load_models(self) -> dict[str, list[dict[str, Any]]]: """Load all dbt models from spellbook repo, organized by schema/subproject.""" if self._models_cache is not None: return self._models_cache repo_path = self._ensure_repo() models: dict[str, list[dict[str, Any]]] = {} # Spellbook uses subprojects - look in dbt_subprojects/ and models/ directories subproject_dirs = [ repo_path / "dbt_subprojects", repo_path / "models", ] for base_dir in subproject_dirs: if not base_dir.exists(): continue # Walk through subproject directories for subproject_dir in base_dir.iterdir(): if not subproject_dir.is_dir(): continue # Skip hidden directories if subproject_dir.name.startswith("."): continue schema_name = subproject_dir.name if schema_name not in models: models[schema_name] = [] # Find SQL model files models_dir = subproject_dir / "models" if not models_dir.exists(): models_dir = subproject_dir for sql_file in models_dir.rglob("*.sql"): # Skip files in target/ or node_modules/ if "target" in sql_file.parts or "node_modules" in sql_file.parts: continue # Extract model name from file path # models/schema/table.sql -> table model_name = sql_file.stem # Try to find schema.yml for metadata schema_yml = sql_file.parent / "schema.yml" if not schema_yml.exists(): schema_yml = sql_file.parent.parent / "schema.yml" # Parse dbt config to get actual Dune table name config = self._parse_dbt_config(sql_file) # Ignore templated dbt config values like "{{ target.schema }}" def _is_templated(val: Any) -> bool: try: s = str(val) except Exception: return False return "{{" in s and "}}" in s raw_schema = config.get("schema") raw_alias = config.get("alias") dune_schema = ( raw_schema.strip() if isinstance(raw_schema, str) else raw_schema ) dune_alias = ( raw_alias.strip() if isinstance(raw_alias, str) else raw_alias ) # Fall back to original names when values are templated or empty if not dune_schema or _is_templated(dune_schema): dune_schema = schema_name if not dune_alias or _is_templated(dune_alias): dune_alias = model_name dune_table = f"{dune_schema}.{dune_alias}" models[schema_name].append({ "name": model_name, "file": sql_file, "schema_yml": schema_yml if schema_yml.exists() else None, "schema": schema_name, "dune_schema": dune_schema, "dune_alias": dune_alias, "dune_table": dune_table, }) self._models_cache = models return models def find_schemas(self, keyword: str) -> Sequence[SchemaMatch]: """ Find schemas (subprojects) matching keyword in Spellbook repo. This searches through dbt subproject names and model descriptions. """ models = self._load_models() matches: list[SchemaMatch] = [] keyword_lower = keyword.lower() for schema_name, model_list in models.items(): # Match schema name if keyword_lower in schema_name.lower(): matches.append(SchemaMatch(schema=schema_name)) continue # Match model names/descriptions for model in model_list: if keyword_lower in model["name"].lower(): if not any(m.schema == schema_name for m in matches): matches.append(SchemaMatch(schema=schema_name)) break return matches def list_tables(self, schema: str, limit: int | None = None) -> Sequence[TableSummary]: """ List tables (dbt models) in a given schema/subproject. Returns model names from the spellbook repository. """ models = self._load_models() schema_models = models.get(schema, []) summaries = [ TableSummary(schema=schema, table=model["name"]) for model in schema_models ] if limit is not None: summaries = summaries[:limit] return summaries def describe_table(self, schema: str, table: str) -> TableDescription: """ Describe table columns by parsing dbt model SQL and schema.yml. Attempts to extract column information from: 1. schema.yml file (if exists) 2. SQL SELECT statement columns 3. Fallback to basic inference """ models = self._load_models() schema_models = models.get(schema, []) # Find matching model model_info = None for model in schema_models: if model["name"] == table: model_info = model break if model_info is None: raise ValueError(f"Table {schema}.{table} not found in Spellbook") columns: list[TableColumn] = [] # Try to parse schema.yml first if model_info["schema_yml"]: columns = self._parse_schema_yml(model_info["schema_yml"], table) # Fallback: parse SQL file for column hints if not columns: columns = self._parse_sql_columns(model_info["file"]) # If still no columns, create a basic placeholder if not columns: columns = [ TableColumn(name="column_1", dune_type="VARCHAR", polars_dtype="Utf8") ] return TableDescription( fully_qualified_name=f"{schema}.{table}", columns=columns, ) def _parse_schema_yml(self, schema_yml_path: Path, table_name: str) -> list[TableColumn]: """Parse dbt schema.yml to extract column definitions.""" try: try: import yaml except ImportError: # PyYAML not available, skip schema.yml parsing return [] with open(schema_yml_path, encoding="utf-8") as f: content = yaml.safe_load(f) if not isinstance(content, dict): return [] # Find model in schema.yml models = content.get("models", []) for model in models: if model.get("name") == table_name: cols = model.get("columns", []) return [ TableColumn( name=col.get("name", ""), dune_type=col.get("data_type", "VARCHAR"), polars_dtype=col.get("data_type"), comment=col.get("description"), ) for col in cols ] except Exception: pass return [] def _parse_dbt_config(self, sql_file: Path) -> dict[str, str]: """ Parse dbt config block from SQL file to extract schema and alias. Looks for patterns like: {{ config(schema='sui_walrus', alias='base_table') }} {{ config(schema="sui_walrus", alias="base_table") }} Returns dict with 'schema' and 'alias' keys, or empty dict if not found. """ try: with open(sql_file, encoding="utf-8") as f: sql = f.read() # Match dbt config block: {{ config(...) }} # Use non-greedy match to get first config block config_match = re.search( r"{{\s*config\s*\((.*?)\)\s*}}", sql, re.IGNORECASE | re.DOTALL, ) if not config_match: return {} config_content = config_match.group(1) result: dict[str, str] = {} # Extract schema parameter (supports single and double quotes) schema_match = re.search( r"schema\s*=\s*['\"]([^'\"]+)['\"]", config_content, re.IGNORECASE, ) if schema_match: result["schema"] = schema_match.group(1) # Extract alias parameter (supports single and double quotes) alias_match = re.search( r"alias\s*=\s*['\"]([^'\"]+)['\"]", config_content, re.IGNORECASE, ) if alias_match: result["alias"] = alias_match.group(1) return result except Exception: # On any error (file read, parsing, etc.), return empty dict # This allows fallback to using schema_name and model_name return {} def _parse_sql_columns(self, sql_file: Path) -> list[TableColumn]: """ Parse SQL file to extract column names from SELECT statements. Note: This is a best-effort heuristic and may not be perfect for complex SQL. For accurate column information, use Dune's DESCRIBE TABLE or query the actual table. """ try: with open(sql_file, encoding="utf-8") as f: sql = f.read() # Look for SELECT ... FROM patterns (simple heuristic) select_match = re.search( r"SELECT\s+(.+?)\s+FROM", sql, re.IGNORECASE | re.DOTALL, ) if select_match: cols_str = select_match.group(1) # Simple split - may not handle all nested cases perfectly # This is OK since column info is optional and best-effort cols = [] for col in cols_str.split(","): col = col.strip() # Basic cleanup - remove obvious SQL noise col = col.split()[-1] if col else "" col = col.strip().strip('"').strip("'") if col and col not in ["*", "DISTINCT", "FROM"]: cols.append( TableColumn( name=col, dune_type="VARCHAR", polars_dtype="Utf8", ) ) return cols[:20] # Limit to reasonable number except Exception: pass return []

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server