execute_sql
Execute SQL queries on Databricks with parameter support for warehouses, catalogs, and schemas to retrieve and manage data.
Instructions
Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| params | Yes |
Implementation Reference
- src/server/databricks_mcp_server.py:193-196 (registration)Registration of the execute_sql tool using the @self.tool decorator, including input parameter descriptions serving as schema.@self.tool( name="execute_sql", description="Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)", )
- The handler function that implements the core logic of the execute_sql tool: parses input params and calls the sql module's execute_sql function.async def execute_sql(params: Dict[str, Any]) -> List[TextContent]: logger.info(f"Executing SQL with params: {params}") try: statement = params.get("statement") warehouse_id = params.get("warehouse_id") catalog = params.get("catalog") schema = params.get("schema") result = await sql.execute_sql(statement, warehouse_id, catalog, schema) return [{"text": json.dumps(result)}] except Exception as e: logger.error(f"Error executing SQL: {str(e)}") return [{"text": json.dumps({"error": str(e)})}]
- src/api/sql.py:14-60 (helper)Supporting function execute_statement in sql.py that likely implements the SQL execution logic called from the handler (note: handler calls execute_sql, which may be a naming mismatch).async def execute_statement( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, row_limit: int = 10000, byte_limit: int = 100000000, # 100MB ) -> Dict[str, Any]: """ Execute a SQL statement. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters row_limit: Maximum number of rows to return byte_limit: Maximum number of bytes to return Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Executing SQL statement: {statement[:100]}...") request_data = { "statement": statement, "warehouse_id": warehouse_id, "wait_timeout": "0s", # Wait indefinitely "row_limit": row_limit, "byte_limit": byte_limit, } if catalog: request_data["catalog"] = catalog if schema: request_data["schema"] = schema if parameters: request_data["parameters"] = parameters return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)