bq_dry_run_sql
Analyze BigQuery SQL queries with a dry-run to estimate costs, validate syntax, and preview schema without executing the query.
Instructions
Perform a dry-run of a BigQuery SQL query to get cost estimates and metadata
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| params | No | Optional query parameters (key-value pairs) | |
| pricePerTiB | No | Price per TiB for cost estimation (defaults to env var or 5.0) | |
| sql | Yes | The SQL query to dry-run |
Implementation Reference
- src/mcp_bigquery/server.py:466-565 (handler)The main handler function that performs the BigQuery dry-run SQL operation. It executes a dry-run query, calculates estimated costs, extracts referenced tables and output schema preview, and handles errors with detailed reporting.async def dry_run_sql( sql: str, params: dict[str, Any] | None = None, price_per_tib: float | None = None, ) -> dict[str, Any]: """ Perform a dry-run of a BigQuery SQL query. Args: sql: The SQL query to dry-run params: Optional query parameters price_per_tib: Price per TiB for cost estimation Returns: Dict with totalBytesProcessed, usdEstimate, referencedTables, and schemaPreview or error details if the query is invalid """ try: client = get_bigquery_client() job_config = bigquery.QueryJobConfig( dry_run=True, use_query_cache=False, query_parameters=build_query_parameters(params), ) query_job = client.query(sql, job_config=job_config) # Get price per TiB (precedence: arg > env > default) if price_per_tib is None: price_per_tib = float(os.environ.get("SAFE_PRICE_PER_TIB", "5.0")) # Calculate cost estimate bytes_processed = query_job.total_bytes_processed or 0 tib_processed = bytes_processed / (2**40) usd_estimate = round(tib_processed * price_per_tib, 6) # Extract referenced tables referenced_tables = [] if query_job.referenced_tables: for table_ref in query_job.referenced_tables: referenced_tables.append( { "project": table_ref.project, "dataset": table_ref.dataset_id, "table": table_ref.table_id, } ) # Extract schema preview schema_preview = [] if query_job.schema: for field in query_job.schema: schema_preview.append( { "name": field.name, "type": field.field_type, "mode": field.mode, } ) return { "totalBytesProcessed": bytes_processed, "usdEstimate": usd_estimate, "referencedTables": referenced_tables, "schemaPreview": schema_preview, } except BadRequest as e: error_msg = str(e) # Improve error message clarity if "Table not found" in error_msg: error_msg = ( f"Table not found. {error_msg}. Please verify the table exists and you have access." ) elif "Column not found" in error_msg: error_msg = f"Column not found. {error_msg}. Please check column names and spelling." error_result = {"error": {"code": "INVALID_SQL", "message": error_msg}} location = extract_error_location(error_msg) if location: error_result["error"]["location"] = location if hasattr(e, "errors") and e.errors: error_result["error"]["details"] = e.errors return error_result except Exception as e: # Provide more context for common errors error_msg = str(e) if "credentials" in error_msg.lower(): error_msg = f"Authentication error: {error_msg}. Please run 'gcloud auth application-default login' to set up credentials." elif "permission" in error_msg.lower(): error_msg = f"Permission denied: {error_msg}. Please verify you have the necessary BigQuery permissions." return {"error": {"code": "UNKNOWN_ERROR", "message": error_msg}}
- src/mcp_bigquery/server.py:99-118 (schema)Input schema definition for the bq_dry_run_sql tool, specifying required SQL query and optional parameters and price per TiB.inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to dry-run", }, "params": { "type": "object", "description": ("Optional query parameters (key-value pairs)"), "additionalProperties": True, }, "pricePerTiB": { "type": "number", "description": ( "Price per TiB for cost estimation " "(defaults to env var or 5.0)" ), }, }, "required": ["sql"],
- src/mcp_bigquery/server.py:94-120 (registration)Tool registration in the MCP server's list_tools() method, defining name, description, and input schema.types.Tool( name="bq_dry_run_sql", description=( "Perform a dry-run of a BigQuery SQL query to get cost " "estimates and metadata" ), inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to dry-run", }, "params": { "type": "object", "description": ("Optional query parameters (key-value pairs)"), "additionalProperties": True, }, "pricePerTiB": { "type": "number", "description": ( "Price per TiB for cost estimation " "(defaults to env var or 5.0)" ), }, }, "required": ["sql"], }, ),
- src/mcp_bigquery/server.py:339-345 (registration)Dispatch logic in the MCP server's call_tool() handler that routes requests for bq_dry_run_sql to the dry_run_sql implementation.elif name == "bq_dry_run_sql": result = await dry_run_sql( sql=arguments["sql"], params=arguments.get("params"), price_per_tib=arguments.get("pricePerTiB"), ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
- src/mcp_bigquery/server.py:50-68 (helper)Helper function used by the dry_run_sql handler to build BigQuery query parameters from input dictionary.def build_query_parameters(params: dict[str, Any] | None) -> list[bigquery.ScalarQueryParameter]: """ Build BigQuery query parameters from a dictionary. Initial implementation treats all values as STRING type. Args: params: Dictionary of parameter names to values Returns: List of ScalarQueryParameter objects """ if not params: return [] return [ bigquery.ScalarQueryParameter(name, "STRING", str(value)) for name, value in params.items() ]