execute_sql_nonblocking
Initiate SQL statement execution on Databricks and receive a statement ID immediately, enabling asynchronous processing without waiting for query completion.
Instructions
Start SQL statement execution and return immediately with statement_id (non-blocking)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| catalog | No | ||
| schema_name | No | ||
| statement | Yes | ||
| warehouse_id | Yes |
Implementation Reference
- The MCP tool handler function for 'execute_sql_nonblocking'. It initiates non-blocking SQL execution via the sql.execute_statement helper, adds a user-friendly note, and returns the result as JSON.@mcp.tool() async def execute_sql_nonblocking( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema_name: Optional[str] = None ) -> str: """Start SQL statement execution and return immediately with statement_id (non-blocking)""" logger.info(f"Executing SQL statement (non-blocking): {statement[:100]}...") try: result = await sql.execute_statement(statement, warehouse_id, catalog, schema_name) # Add helpful info about checking status status = result.get("status", {}).get("state", "") if status == "PENDING": result["note"] = "Query started. Use get_sql_status with the statement_id to check progress." return json.dumps(result) except Exception as e: logger.error(f"Error executing SQL: {str(e)}") return json.dumps({"error": str(e)})
- src/api/sql.py:14-67 (helper)Supporting utility function that performs the core Databricks SQL API request to start non-blocking statement execution, with fallback to alternative endpoint and configurable limits.async def execute_statement( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, row_limit: int = 10000, byte_limit: int = 26214400, # 25MB max allowed ) -> Dict[str, Any]: """ Execute a SQL statement. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters row_limit: Maximum number of rows to return byte_limit: Maximum number of bytes to return Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Executing SQL statement: {statement[:100]}...") request_data = { "statement": statement, "warehouse_id": warehouse_id, "wait_timeout": "0s", # Return immediately, don't wait "row_limit": row_limit, "byte_limit": byte_limit, } if catalog: request_data["catalog"] = catalog if schema: request_data["schema"] = schema if parameters: request_data["parameters"] = parameters # Try the classic SQL API first (works on most workspaces) try: return make_api_request("POST", "/api/2.0/sql/statements", data=request_data) except Exception as e: # If that fails, try the newer SQL execution API logger.warning(f"Classic SQL API failed: {e}. Trying newer SQL execution API...") return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)