Skip to main content
Glama
snow_cli.py9.17 kB
"""Thin wrapper around the official Snowflake CLI (`snow`). This module shells out to the `snow` binary to execute SQL commands using configured profiles. It provides helpers to run inline SQL, files, and parse CSV/JSON output where available. """ from __future__ import annotations import csv import json import os import shutil import subprocess from dataclasses import dataclass from io import StringIO from typing import Any, Dict, List, Optional from .config import get_config class SnowCLIError(RuntimeError): pass def _ensure_snow_available() -> None: if shutil.which("snow") is None: raise SnowCLIError( "`snow` CLI not found. Install with `pip install snowflake-cli` and configure a profile.", ) @dataclass class QueryOutput: raw_stdout: str raw_stderr: str returncode: int # Optional parsed forms rows: Optional[List[Dict[str, Any]]] = None columns: Optional[List[str]] = None class SnowCLI: """Runner for Snowflake CLI commands.""" def __init__(self, profile: Optional[str] = None): cfg = get_config() self.profile = profile or cfg.snowflake.profile self.default_ctx = { "warehouse": cfg.snowflake.warehouse, "database": cfg.snowflake.database, "schema": cfg.snowflake.schema, "role": cfg.snowflake.role, } def _base_args( self, ctx_overrides: Optional[Dict[str, Optional[str]]] = None ) -> List[str]: # Use --connection/-c to select the configured connection name args = ["snow", "sql", "--connection", self.profile] ctx = {**self.default_ctx} if ctx_overrides: ctx.update({k: v for k, v in ctx_overrides.items() if v}) for key in ("warehouse", "database", "schema", "role"): val = ctx.get(key) if val: args.extend([f"--{key}", str(val)]) return args def run_query( self, query: str, *, output_format: Optional[str] = None, # "csv" or "json" if supported ctx_overrides: Optional[Dict[str, Optional[str]]] = None, timeout: Optional[int] = None, ) -> QueryOutput: """Execute an inline SQL statement using Snowflake CLI. Attempts to parse output when `output_format` is provided. Falls back to raw stdout if parsing is not possible. """ _ensure_snow_available() args = self._base_args(ctx_overrides) args.extend(["-q", query]) # Try to set format if supported by user's CLI version. We don't fail # if it's not supported; parsing simply won't happen. if output_format in {"csv", "json"}: args.extend(["--format", output_format]) if os.getenv("IGLOO_MCP_DEBUG") == "1": try: # Log the command about to run (without printing sensitive env) debug_cmd = " ".join(args) print(f"[IGLOO-MCP DEBUG] Executing: {debug_cmd}") # Also echo the SQL in a trimmed form for readability trimmed = " ".join(query.split()) print(f"[IGLOO-MCP DEBUG] SQL: {trimmed}") except Exception: pass proc = subprocess.run( args, capture_output=True, text=True, timeout=timeout, check=False, ) out = QueryOutput(proc.stdout, proc.stderr, proc.returncode) if proc.returncode != 0: raise SnowCLIError(proc.stderr.strip() or "Snowflake CLI returned an error") # Parse if user requested a format if output_format == "json": try: data = json.loads(proc.stdout) # Expect either a list of rows or an object with data if isinstance(data, list): out.rows = data # Extract column names from first row if available if data and isinstance(data[0], dict): out.columns = list(data[0].keys()) elif isinstance(data, dict): rows = data.get("data") or data.get("rows") if isinstance(rows, list): out.rows = rows # Extract column names from first row if available if rows and isinstance(rows[0], dict): out.columns = list(rows[0].keys()) except json.JSONDecodeError: pass elif output_format == "csv": try: sio = StringIO(proc.stdout) reader = csv.DictReader(sio) out.rows = list(reader) # type: ignore out.columns = list(reader.fieldnames or []) except Exception: pass return out def run_file( self, file_path: str, *, output_format: Optional[str] = None, ctx_overrides: Optional[Dict[str, Optional[str]]] = None, timeout: Optional[int] = None, ) -> QueryOutput: _ensure_snow_available() args = self._base_args(ctx_overrides) args.extend(["-f", file_path]) if output_format in {"csv", "json"}: args.extend(["--format", output_format]) if os.getenv("IGLOO_MCP_DEBUG") == "1": try: debug_cmd = " ".join(args) print(f"[IGLOO-MCP DEBUG] Executing file: {debug_cmd}") print(f"[IGLOO-MCP DEBUG] File: {file_path}") except Exception: pass proc = subprocess.run( args, capture_output=True, text=True, timeout=timeout, check=False, ) out = QueryOutput(proc.stdout, proc.stderr, proc.returncode) if proc.returncode != 0: raise SnowCLIError(proc.stderr.strip() or "Snowflake CLI returned an error") return out def test_connection(self) -> bool: try: out = self.run_query("SELECT 1", output_format="csv") if out.rows and len(out.rows) > 0: # try to extract numeric 1 row = list(out.rows[0].values()) return any(str(v).strip() == "1" for v in row) return bool(out.raw_stdout.strip()) except SnowCLIError: return False # Connection management helpers def list_connections(self) -> List[Dict[str, Any]]: _ensure_snow_available() proc = subprocess.run( ["snow", "connection", "list", "--format", "JSON"], capture_output=True, text=True, check=False, ) if proc.returncode != 0: raise SnowCLIError(proc.stderr.strip() or "Failed to list connections") try: data = json.loads(proc.stdout or "[]") except json.JSONDecodeError: data = [] return data if isinstance(data, list) else [] def connection_exists(self, name: str) -> bool: try: conns = self.list_connections() return any( (c.get("name") or c.get("connection_name")) == name for c in conns ) except SnowCLIError: return False def add_connection( self, name: str, *, account: str, user: str, private_key_file: str, role: Optional[str] = None, warehouse: Optional[str] = None, database: Optional[str] = None, schema: Optional[str] = None, make_default: bool = False, ) -> None: _ensure_snow_available() args = [ "snow", "connection", "add", "--connection-name", name, "--account", account, "--user", user, "--private-key", private_key_file, "--no-interactive", ] if role: args.extend(["--role", role]) if warehouse: args.extend(["--warehouse", warehouse]) if database: args.extend(["--database", database]) if schema: args.extend(["--schema", schema]) if make_default: args.append("--default") proc = subprocess.run(args, capture_output=True, text=True, check=False) if proc.returncode != 0: raise SnowCLIError(proc.stderr.strip() or "Failed to add connection") def set_default_connection(self, name: str) -> None: _ensure_snow_available() proc = subprocess.run( ["snow", "connection", "set-default", name], capture_output=True, text=True, check=False, ) if proc.returncode != 0: raise SnowCLIError( proc.stderr.strip() or "Failed to set default connection" ) def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server