Skip to main content
Glama
samhavens

Databricks MCP Server

by samhavens

execute_sql

Run SQL queries in Databricks by specifying a statement, warehouse ID, and optional catalog or schema. Blocks execution until query completion.

Instructions

Execute a SQL statement and wait for completion (blocking)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
catalogNo
schema_nameNo
statementYes
warehouse_idYes

Implementation Reference

  • The primary MCP tool handler for 'execute_sql'. Decorated with @mcp.tool() for automatic registration. Executes blocking SQL queries on Databricks warehouse by delegating to the sql.execute_and_wait helper.
    @mcp.tool() async def execute_sql( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema_name: Optional[str] = None ) -> str: """Execute a SQL statement and wait for completion (blocking)""" logger.info(f"Executing SQL statement (blocking): {statement[:100]}...") try: result = await sql.execute_and_wait( statement=statement, warehouse_id=warehouse_id, catalog=catalog, schema=schema_name, timeout_seconds=300 # 5 minutes max ) return json.dumps(result) except Exception as e: logger.error(f"Error executing SQL: {str(e)}") return json.dumps({"error": str(e)})
  • Key helper function implementing the blocking SQL execution logic. Initiates the statement via execute_statement, then polls get_statement_status until SUCCEEDED, FAILED, etc., with configurable timeout.
    async def execute_and_wait( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, timeout_seconds: int = 300, # 5 minutes poll_interval_seconds: int = 1, ) -> Dict[str, Any]: """ Execute a SQL statement and wait for completion. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters timeout_seconds: Maximum time to wait for completion poll_interval_seconds: How often to poll for status Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails TimeoutError: If query execution times out """ import asyncio import time logger.info(f"Executing SQL statement with waiting: {statement[:100]}...") # Start execution response = await execute_statement( statement=statement, warehouse_id=warehouse_id, catalog=catalog, schema=schema, parameters=parameters, ) statement_id = response.get("statement_id") if not statement_id: raise ValueError("No statement_id returned from execution") # Poll for completion start_time = time.time() status = response.get("status", {}).get("state", "") while status in ["PENDING", "RUNNING"]: # Check timeout if time.time() - start_time > timeout_seconds: raise TimeoutError(f"Query execution timed out after {timeout_seconds} seconds") # Wait before polling again await asyncio.sleep(poll_interval_seconds) # Check status status_response = await get_statement_status(statement_id) status = status_response.get("status", {}).get("state", "") if status == "SUCCEEDED": return status_response elif status in ["FAILED", "CANCELED", "CLOSED"]: error_message = status_response.get("status", {}).get("error", {}).get("message", "Unknown error") raise DatabricksAPIError(f"Query execution failed: {error_message}", response=status_response) return response
  • Low-level API call to start SQL statement execution asynchronously. Called by both blocking and non-blocking MCP tools.
    async def execute_statement( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, row_limit: int = 10000, byte_limit: int = 26214400, # 25MB max allowed ) -> Dict[str, Any]: """ Execute a SQL statement. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters row_limit: Maximum number of rows to return byte_limit: Maximum number of bytes to return Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Executing SQL statement: {statement[:100]}...") request_data = { "statement": statement, "warehouse_id": warehouse_id, "wait_timeout": "0s", # Return immediately, don't wait "row_limit": row_limit, "byte_limit": byte_limit, } if catalog: request_data["catalog"] = catalog if schema: request_data["schema"] = schema if parameters: request_data["parameters"] = parameters # Try the classic SQL API first (works on most workspaces) try: return make_api_request("POST", "/api/2.0/sql/statements", data=request_data) except Exception as e: # If that fails, try the newer SQL execution API logger.warning(f"Classic SQL API failed: {e}. Trying newer SQL execution API...") return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samhavens/databricks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server