Skip to main content
Glama
YannBrrd

Simple Snowflake MCP

by YannBrrd

execute-query

Run SQL queries (SELECT, SHOW, DESCRIBE, EXPLAIN, WITH) in Snowflake via Simple Snowflake MCP. Supports read-only mode for secure data retrieval and outputs results in markdown format.

Instructions

Execute a SQL query in read-only mode (SELECT, SHOW, DESCRIBE, EXPLAIN, WITH) or not (if 'read_only' is false), result in markdown format.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
read_onlyNoAllow only read-only queries
sqlYesSQL query to execute

Implementation Reference

  • Main execution handler for the 'execute-query' tool. Validates parameters, enforces read-only restrictions, applies automatic row limits for SELECT queries, executes the SQL via _safe_snowflake_execute, formats output based on requested format, and returns results or errors.
    elif name == "execute-query":
        sql = args.get("sql")
        if not sql:
            raise ValueError("'sql' parameter is required")
            
        read_only = args.get("read_only", MCP_READ_ONLY)
        format_type = args.get("format", "markdown")
        limit = args.get("limit")
        
        # Check if query is allowed in read-only mode
        allowed_commands = ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "WITH"]
        first_word = sql.strip().split()[0].upper() if sql.strip() else ""
        
        if read_only and first_word not in allowed_commands:
            return [types.TextContent(type="text", text="Only read-only queries are allowed in read-only mode.")]
        
        # Apply limit if specified, or use default for SELECT queries
        if limit:
            # Validate limit doesn't exceed maximum
            if limit > MAX_QUERY_LIMIT:
                limit = MAX_QUERY_LIMIT
                logger.warning(f"Query limit reduced from {args.get('limit')} to maximum {MAX_QUERY_LIMIT}")
        elif first_word == "SELECT" and "LIMIT" not in sql.upper():
            # Apply default limit for SELECT queries without explicit limit
            limit = DEFAULT_QUERY_LIMIT
            logger.info(f"Applying default limit {DEFAULT_QUERY_LIMIT} to SELECT query")
        
        if limit and "LIMIT" not in sql.upper():
            sql += f" LIMIT {limit}"
        
        result = _safe_snowflake_execute(sql, "Execute query")
        if result["success"]:
            if format_type == "markdown":
                output = _format_markdown_table(result["data"])
            else:
                output = json.dumps(result["data"], indent=2, default=str)
            return [types.TextContent(type="text", text=output)]
        else:
            return [types.TextContent(type="text", text=f"Snowflake error: {result['error']}")]
  • Tool registration in handle_list_tools(). Registers the 'execute-query' tool with detailed input schema defining parameters: sql (required), read_only (boolean, default true), format (enum: markdown/json/csv, default markdown), limit (integer 1-50000).
    types.Tool(
        name="execute-query",
        description="Execute a SQL query with read-only protection and flexible output format",
        inputSchema={
            "type": "object",
            "properties": {
                "sql": {"type": "string", "description": "SQL query to execute", "minLength": 1},
                "read_only": {"type": "boolean", "default": True, "description": "Allow only read-only queries"},
                "format": {"type": "string", "enum": ["markdown", "json", "csv"], "default": "markdown"},
                "limit": {"type": "integer", "minimum": 1, "maximum": 50000, "description": "Maximum rows to return"}
            },
            "required": ["sql"],
            "additionalProperties": False
        },
    ),
  • JSON Schema for 'execute-query' tool inputs, defining validation rules for sql, read_only, format, and limit parameters.
        inputSchema={
            "type": "object",
            "properties": {
                "sql": {"type": "string", "description": "SQL query to execute", "minLength": 1},
                "read_only": {"type": "boolean", "default": True, "description": "Allow only read-only queries"},
                "format": {"type": "string", "enum": ["markdown", "json", "csv"], "default": "markdown"},
                "limit": {"type": "integer", "minimum": 1, "maximum": 50000, "description": "Maximum rows to return"}
            },
            "required": ["sql"],
            "additionalProperties": False
        },
    ),
  • Helper function that performs the actual Snowflake query execution, connection management, result processing into JSON-compatible dicts, and comprehensive error handling. Called by the execute-query handler.
    def _safe_snowflake_execute(query: str, description: str = "Query") -> Dict[str, Any]:
        """
        Safely execute a Snowflake query with proper error handling and logging.
        """
        try:
            logger.info(f"Executing {description}: {query[:100]}...")
            ctx = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
            cur = ctx.cursor()
            cur.execute(query)
            
            # Handle different query types
            if cur.description:
                rows = cur.fetchall()
                columns = [desc[0] for desc in cur.description]
                result = [dict(zip(columns, row)) for row in rows]
            else:
                result = {"status": "success", "rowcount": cur.rowcount}
                
            cur.close()
            ctx.close()
            logger.info(f"{description} completed successfully")
            return {"success": True, "data": result}
            
        except Exception as e:
            logger.error(f"{description} failed: {str(e)}")
            return {"success": False, "error": str(e), "data": None}
  • Helper function to format query results as a markdown table, used when format='markdown' in the execute-query tool.
    def _format_markdown_table(data: List[Dict[str, Any]]) -> str:
        """Format query results as a markdown table."""
        if not data:
            return "No results found."
        
        columns = list(data[0].keys())
        header = "| " + " | ".join(columns) + " |"
        separator = "|" + "---|" * len(columns)
        
        rows = []
        for row in data:
            row_str = "| " + " | ".join(str(row.get(col, "")) for col in columns) + " |"
            rows.append(row_str)
        
        return header + "\n" + separator + "\n" + "\n".join(rows)
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YannBrrd/simple_snowflake_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server