execute-query
Run SQL queries (SELECT, SHOW, DESCRIBE, EXPLAIN, WITH) in Snowflake via Simple Snowflake MCP. Supports read-only mode for secure data retrieval and outputs results in markdown format.
Instructions
Execute a SQL query in read-only mode (SELECT, SHOW, DESCRIBE, EXPLAIN, WITH) or not (if 'read_only' is false), result in markdown format.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| read_only | No | Allow only read-only queries | |
| sql | Yes | SQL query to execute |
Implementation Reference
- Main execution handler for the 'execute-query' tool. Validates parameters, enforces read-only restrictions, applies automatic row limits for SELECT queries, executes the SQL via _safe_snowflake_execute, formats output based on requested format, and returns results or errors.elif name == "execute-query": sql = args.get("sql") if not sql: raise ValueError("'sql' parameter is required") read_only = args.get("read_only", MCP_READ_ONLY) format_type = args.get("format", "markdown") limit = args.get("limit") # Check if query is allowed in read-only mode allowed_commands = ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "WITH"] first_word = sql.strip().split()[0].upper() if sql.strip() else "" if read_only and first_word not in allowed_commands: return [types.TextContent(type="text", text="Only read-only queries are allowed in read-only mode.")] # Apply limit if specified, or use default for SELECT queries if limit: # Validate limit doesn't exceed maximum if limit > MAX_QUERY_LIMIT: limit = MAX_QUERY_LIMIT logger.warning(f"Query limit reduced from {args.get('limit')} to maximum {MAX_QUERY_LIMIT}") elif first_word == "SELECT" and "LIMIT" not in sql.upper(): # Apply default limit for SELECT queries without explicit limit limit = DEFAULT_QUERY_LIMIT logger.info(f"Applying default limit {DEFAULT_QUERY_LIMIT} to SELECT query") if limit and "LIMIT" not in sql.upper(): sql += f" LIMIT {limit}" result = _safe_snowflake_execute(sql, "Execute query") if result["success"]: if format_type == "markdown": output = _format_markdown_table(result["data"]) else: output = json.dumps(result["data"], indent=2, default=str) return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Snowflake error: {result['error']}")]
- src/simple_snowflake_mcp/server.py:643-657 (registration)Tool registration in handle_list_tools(). Registers the 'execute-query' tool with detailed input schema defining parameters: sql (required), read_only (boolean, default true), format (enum: markdown/json/csv, default markdown), limit (integer 1-50000).types.Tool( name="execute-query", description="Execute a SQL query with read-only protection and flexible output format", inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL query to execute", "minLength": 1}, "read_only": {"type": "boolean", "default": True, "description": "Allow only read-only queries"}, "format": {"type": "string", "enum": ["markdown", "json", "csv"], "default": "markdown"}, "limit": {"type": "integer", "minimum": 1, "maximum": 50000, "description": "Maximum rows to return"} }, "required": ["sql"], "additionalProperties": False }, ),
- JSON Schema for 'execute-query' tool inputs, defining validation rules for sql, read_only, format, and limit parameters.inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL query to execute", "minLength": 1}, "read_only": {"type": "boolean", "default": True, "description": "Allow only read-only queries"}, "format": {"type": "string", "enum": ["markdown", "json", "csv"], "default": "markdown"}, "limit": {"type": "integer", "minimum": 1, "maximum": 50000, "description": "Maximum rows to return"} }, "required": ["sql"], "additionalProperties": False }, ),
- Helper function that performs the actual Snowflake query execution, connection management, result processing into JSON-compatible dicts, and comprehensive error handling. Called by the execute-query handler.def _safe_snowflake_execute(query: str, description: str = "Query") -> Dict[str, Any]: """ Safely execute a Snowflake query with proper error handling and logging. """ try: logger.info(f"Executing {description}: {query[:100]}...") ctx = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = ctx.cursor() cur.execute(query) # Handle different query types if cur.description: rows = cur.fetchall() columns = [desc[0] for desc in cur.description] result = [dict(zip(columns, row)) for row in rows] else: result = {"status": "success", "rowcount": cur.rowcount} cur.close() ctx.close() logger.info(f"{description} completed successfully") return {"success": True, "data": result} except Exception as e: logger.error(f"{description} failed: {str(e)}") return {"success": False, "error": str(e), "data": None}
- Helper function to format query results as a markdown table, used when format='markdown' in the execute-query tool.def _format_markdown_table(data: List[Dict[str, Any]]) -> str: """Format query results as a markdown table.""" if not data: return "No results found." columns = list(data[0].keys()) header = "| " + " | ".join(columns) + " |" separator = "|" + "---|" * len(columns) rows = [] for row in data: row_str = "| " + " | ".join(str(row.get(col, "")) for col in columns) + " |" rows.append(row_str) return header + "\n" + separator + "\n" + "\n".join(rows)