execute_sql
Run SQL queries on Databricks resources by providing a SQL statement, warehouse ID, and optional catalog or schema. Connects via Databricks MCP Server for SQL execution.
Instructions
Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| params | Yes |
Implementation Reference
- MCP tool handler and registration for 'execute_sql'. Extracts parameters from input and delegates execution to the sql module's execute_sql function, returning JSON results or error.@self.tool( name="execute_sql", description="Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)", ) async def execute_sql(params: Dict[str, Any]) -> List[TextContent]: logger.info(f"Executing SQL with params: {params}") try: statement = params.get("statement") warehouse_id = params.get("warehouse_id") catalog = params.get("catalog") schema = params.get("schema") result = await sql.execute_sql(statement, warehouse_id, catalog, schema) return [{"text": json.dumps(result)}] except Exception as e: logger.error(f"Error executing SQL: {str(e)}") return [{"text": json.dumps({"error": str(e)})}]
- src/api/sql.py:14-60 (helper)Core SQL execution logic via Databricks API (/api/2.0/sql/statements/execute). Likely the target of the handler's delegation (possibly aliased or intended as execute_sql). Handles API request with parameters, limits, and error handling.async def execute_statement( statement: str, warehouse_id: str, catalog: Optional[str] = None, schema: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, row_limit: int = 10000, byte_limit: int = 100000000, # 100MB ) -> Dict[str, Any]: """ Execute a SQL statement. Args: statement: The SQL statement to execute warehouse_id: ID of the SQL warehouse to use catalog: Optional catalog to use schema: Optional schema to use parameters: Optional statement parameters row_limit: Maximum number of rows to return byte_limit: Maximum number of bytes to return Returns: Response containing query results Raises: DatabricksAPIError: If the API request fails """ logger.info(f"Executing SQL statement: {statement[:100]}...") request_data = { "statement": statement, "warehouse_id": warehouse_id, "wait_timeout": "0s", # Wait indefinitely "row_limit": row_limit, "byte_limit": byte_limit, } if catalog: request_data["catalog"] = catalog if schema: request_data["schema"] = schema if parameters: request_data["parameters"] = parameters return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)