Skip to main content
Glama
samhavens

Databricks MCP Server

by samhavens

execute_sql

Execute SQL statements on Databricks to query data, run analytics, and manage databases through direct SQL execution with completion waiting.

Instructions

Execute a SQL statement and wait for completion (blocking)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
statementYes
warehouse_idYes
catalogNo
schema_nameNo

Implementation Reference

  • The main MCP tool handler for 'execute_sql'. Registers the tool via @mcp.tool() decorator and implements the logic by calling the sql.execute_and_wait helper, handling errors, and returning JSON.
    @mcp.tool()
    async def execute_sql(
        statement: str,
        warehouse_id: str,
        catalog: Optional[str] = None,
        schema_name: Optional[str] = None
    ) -> str:
        """Execute a SQL statement and wait for completion (blocking)"""
        logger.info(f"Executing SQL statement (blocking): {statement[:100]}...")
        try:
            result = await sql.execute_and_wait(
                statement=statement,
                warehouse_id=warehouse_id, 
                catalog=catalog,
                schema=schema_name,
                timeout_seconds=300  # 5 minutes max
            )
            return json.dumps(result)
        except Exception as e:
            logger.error(f"Error executing SQL: {str(e)}")
            return json.dumps({"error": str(e)})
  • Helper function that starts SQL execution using execute_statement and polls the status until completion or timeout. This is the core blocking SQL execution logic used by the tool handler.
    async def execute_and_wait(
        statement: str,
        warehouse_id: str,
        catalog: Optional[str] = None,
        schema: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        timeout_seconds: int = 300,  # 5 minutes
        poll_interval_seconds: int = 1,
    ) -> Dict[str, Any]:
        """
        Execute a SQL statement and wait for completion.
        
        Args:
            statement: The SQL statement to execute
            warehouse_id: ID of the SQL warehouse to use
            catalog: Optional catalog to use
            schema: Optional schema to use
            parameters: Optional statement parameters
            timeout_seconds: Maximum time to wait for completion
            poll_interval_seconds: How often to poll for status
            
        Returns:
            Response containing query results
            
        Raises:
            DatabricksAPIError: If the API request fails
            TimeoutError: If query execution times out
        """
        import asyncio
        import time
        
        logger.info(f"Executing SQL statement with waiting: {statement[:100]}...")
        
        # Start execution
        response = await execute_statement(
            statement=statement,
            warehouse_id=warehouse_id,
            catalog=catalog,
            schema=schema,
            parameters=parameters,
        )
        
        statement_id = response.get("statement_id")
        if not statement_id:
            raise ValueError("No statement_id returned from execution")
        
        # Poll for completion
        start_time = time.time()
        status = response.get("status", {}).get("state", "")
        
        while status in ["PENDING", "RUNNING"]:
            # Check timeout
            if time.time() - start_time > timeout_seconds:
                raise TimeoutError(f"Query execution timed out after {timeout_seconds} seconds")
            
            # Wait before polling again
            await asyncio.sleep(poll_interval_seconds)
            
            # Check status
            status_response = await get_statement_status(statement_id)
            status = status_response.get("status", {}).get("state", "")
            
            if status == "SUCCEEDED":
                return status_response
            elif status in ["FAILED", "CANCELED", "CLOSED"]:
                error_message = status_response.get("status", {}).get("error", {}).get("message", "Unknown error")
                raise DatabricksAPIError(f"Query execution failed: {error_message}", response=status_response)
        
        return response
  • Low-level helper that submits the SQL statement to Databricks SQL API (tries classic endpoint first, falls back to newer), used by execute_and_wait.
    async def execute_statement(
        statement: str,
        warehouse_id: str,
        catalog: Optional[str] = None,
        schema: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        row_limit: int = 10000,
        byte_limit: int = 26214400,  # 25MB max allowed
    ) -> Dict[str, Any]:
        """
        Execute a SQL statement.
        
        Args:
            statement: The SQL statement to execute
            warehouse_id: ID of the SQL warehouse to use
            catalog: Optional catalog to use
            schema: Optional schema to use
            parameters: Optional statement parameters
            row_limit: Maximum number of rows to return
            byte_limit: Maximum number of bytes to return
            
        Returns:
            Response containing query results
            
        Raises:
            DatabricksAPIError: If the API request fails
        """
        logger.info(f"Executing SQL statement: {statement[:100]}...")
        
        request_data = {
            "statement": statement,
            "warehouse_id": warehouse_id,
            "wait_timeout": "0s",  # Return immediately, don't wait
            "row_limit": row_limit,
            "byte_limit": byte_limit,
        }
        
        if catalog:
            request_data["catalog"] = catalog
            
        if schema:
            request_data["schema"] = schema
            
        if parameters:
            request_data["parameters"] = parameters
            
        # Try the classic SQL API first (works on most workspaces)
        try:
            return make_api_request("POST", "/api/2.0/sql/statements", data=request_data)
        except Exception as e:
            # If that fails, try the newer SQL execution API
            logger.warning(f"Classic SQL API failed: {e}. Trying newer SQL execution API...")
            return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samhavens/databricks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server