execute_sql_nonblocking
Execute SQL queries asynchronously on Databricks to avoid blocking while processing large datasets, returning a statement ID for tracking.
Instructions
Start SQL statement execution and return immediately with statement_id (non-blocking)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| statement | Yes | ||
| warehouse_id | Yes | ||
| catalog | No | ||
| schema_name | No |
Implementation Reference
- The primary handler function for the 'execute_sql_nonblocking' tool, registered via @mcp.tool() decorator. It invokes the sql.execute_statement helper, adds user-friendly notes, and serializes the response to JSON.@mcp.tool() async def execute_sql_nonblocking( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema_name: Optional[str] = None ) -> str: """Start SQL statement execution and return immediately with statement_id (non-blocking)""" logger.info(f"Executing SQL statement (non-blocking): {statement[:100]}...") try: result = await sql.execute_statement(statement, warehouse_id, catalog, schema_name) # Add helpful info about checking status status = result.get("status", {}).get("state", "") if status == "PENDING": result["note"] = "Query started. Use get_sql_status with the statement_id to check progress." return json.dumps(result) except Exception as e: logger.error(f"Error executing SQL: {str(e)}") return json.dumps({"error": str(e)})
- src/api/sql.py:14-67 (helper)Supporting utility function that constructs the API request payload and calls the Databricks SQL Statements API (non-blocking execution with wait_timeout=0s). Handles fallback to alternative endpoint if needed.async def execute_statement( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, row_limit: int = 10000, byte_limit: int = 26214400, # 25MB max allowed ) -> Dict[str, Any]: """ Execute a SQL statement. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters row_limit: Maximum number of rows to return byte_limit: Maximum number of bytes to return Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Executing SQL statement: {statement[:100]}...") request_data = { "statement": statement, "warehouse_id": warehouse_id, "wait_timeout": "0s", # Return immediately, don't wait "row_limit": row_limit, "byte_limit": byte_limit, } if catalog: request_data["catalog"] = catalog if schema: request_data["schema"] = schema if parameters: request_data["parameters"] = parameters # Try the classic SQL API first (works on most workspaces) try: return make_api_request("POST", "/api/2.0/sql/statements", data=request_data) except Exception as e: # If that fails, try the newer SQL execution API logger.warning(f"Classic SQL API failed: {e}. Trying newer SQL execution API...") return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)