Skip to main content
Glama
samhavens

Databricks MCP Server

by samhavens

execute_sql_nonblocking

Execute SQL queries asynchronously on Databricks to avoid blocking while processing large datasets, returning a statement ID for tracking.

Instructions

Start SQL statement execution and return immediately with statement_id (non-blocking)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
statementYes
warehouse_idYes
catalogNo
schema_nameNo

Implementation Reference

  • The primary handler function for the 'execute_sql_nonblocking' tool, registered via @mcp.tool() decorator. It invokes the sql.execute_statement helper, adds user-friendly notes, and serializes the response to JSON.
    @mcp.tool()
    async def execute_sql_nonblocking(
        statement: str,
        warehouse_id: str,
        catalog: Optional[str] = None,
        schema_name: Optional[str] = None
    ) -> str:
        """Start SQL statement execution and return immediately with statement_id (non-blocking)"""
        logger.info(f"Executing SQL statement (non-blocking): {statement[:100]}...")
        try:
            result = await sql.execute_statement(statement, warehouse_id, catalog, schema_name)
            
            # Add helpful info about checking status
            status = result.get("status", {}).get("state", "")
            if status == "PENDING":
                result["note"] = "Query started. Use get_sql_status with the statement_id to check progress."
                
            return json.dumps(result)
        except Exception as e:
            logger.error(f"Error executing SQL: {str(e)}")
            return json.dumps({"error": str(e)})
  • Supporting utility function that constructs the API request payload and calls the Databricks SQL Statements API (non-blocking execution with wait_timeout=0s). Handles fallback to alternative endpoint if needed.
    async def execute_statement(
        statement: str,
        warehouse_id: str,
        catalog: Optional[str] = None,
        schema: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        row_limit: int = 10000,
        byte_limit: int = 26214400,  # 25MB max allowed
    ) -> Dict[str, Any]:
        """
        Execute a SQL statement.
        
        Args:
            statement: The SQL statement to execute
            warehouse_id: ID of the SQL warehouse to use
            catalog: Optional catalog to use
            schema: Optional schema to use
            parameters: Optional statement parameters
            row_limit: Maximum number of rows to return
            byte_limit: Maximum number of bytes to return
            
        Returns:
            Response containing query results
            
        Raises:
            DatabricksAPIError: If the API request fails
        """
        logger.info(f"Executing SQL statement: {statement[:100]}...")
        
        request_data = {
            "statement": statement,
            "warehouse_id": warehouse_id,
            "wait_timeout": "0s",  # Return immediately, don't wait
            "row_limit": row_limit,
            "byte_limit": byte_limit,
        }
        
        if catalog:
            request_data["catalog"] = catalog
            
        if schema:
            request_data["schema"] = schema
            
        if parameters:
            request_data["parameters"] = parameters
            
        # Try the classic SQL API first (works on most workspaces)
        try:
            return make_api_request("POST", "/api/2.0/sql/statements", data=request_data)
        except Exception as e:
            # If that fails, try the newer SQL execution API
            logger.warning(f"Classic SQL API failed: {e}. Trying newer SQL execution API...")
            return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samhavens/databricks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server