Skip to main content
Glama
caron14

BigQuery Validator

by caron14

bq_dry_run_sql

Validate BigQuery SQL queries without execution to preview schema and estimate costs before running.

Instructions

Perform a dry-run of a BigQuery SQL query to get cost estimates and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYesThe SQL query to dry-run
paramsNoOptional query parameters (key-value pairs)
pricePerTiBNoPrice per TiB for cost estimation (defaults to env var or 5.0)

Implementation Reference

  • The core handler function that executes the bq_dry_run_sql tool: performs a dry-run query on BigQuery, estimates processing cost, extracts referenced tables and result schema.
    async def dry_run_sql(
        sql: str,
        params: dict[str, Any] | None = None,
        price_per_tib: float | None = None,
    ) -> dict[str, Any]:
        """
        Perform a dry-run of a BigQuery SQL query.
    
        Args:
            sql: The SQL query to dry-run
            params: Optional query parameters
            price_per_tib: Price per TiB for cost estimation
    
        Returns:
            Dict with totalBytesProcessed, usdEstimate, referencedTables,
            and schemaPreview
            or error details if the query is invalid
        """
        try:
            client = get_bigquery_client()
    
            job_config = bigquery.QueryJobConfig(
                dry_run=True,
                use_query_cache=False,
                query_parameters=build_query_parameters(params),
            )
    
            query_job = client.query(sql, job_config=job_config)
    
            # Get price per TiB (precedence: arg > env > default)
            if price_per_tib is None:
                price_per_tib = float(os.environ.get("SAFE_PRICE_PER_TIB", "5.0"))
    
            # Calculate cost estimate
            bytes_processed = query_job.total_bytes_processed or 0
            tib_processed = bytes_processed / (2**40)
            usd_estimate = round(tib_processed * price_per_tib, 6)
    
            # Extract referenced tables
            referenced_tables = []
            if query_job.referenced_tables:
                for table_ref in query_job.referenced_tables:
                    referenced_tables.append(
                        {
                            "project": table_ref.project,
                            "dataset": table_ref.dataset_id,
                            "table": table_ref.table_id,
                        }
                    )
    
            # Extract schema preview
            schema_preview = []
            if query_job.schema:
                for field in query_job.schema:
                    schema_preview.append(
                        {
                            "name": field.name,
                            "type": field.field_type,
                            "mode": field.mode,
                        }
                    )
    
            return {
                "totalBytesProcessed": bytes_processed,
                "usdEstimate": usd_estimate,
                "referencedTables": referenced_tables,
                "schemaPreview": schema_preview,
            }
    
        except BadRequest as e:
            error_msg = str(e)
            # Improve error message clarity
            if "Table not found" in error_msg:
                error_msg = (
                    f"Table not found. {error_msg}. Please verify the table exists and you have access."
                )
            elif "Column not found" in error_msg:
                error_msg = f"Column not found. {error_msg}. Please check column names and spelling."
    
            error_result = {"error": {"code": "INVALID_SQL", "message": error_msg}}
    
            location = extract_error_location(error_msg)
            if location:
                error_result["error"]["location"] = location
    
            if hasattr(e, "errors") and e.errors:
                error_result["error"]["details"] = e.errors
    
            return error_result
    
        except Exception as e:
            # Provide more context for common errors
            error_msg = str(e)
            if "credentials" in error_msg.lower():
                error_msg = f"Authentication error: {error_msg}. Please run 'gcloud auth application-default login' to set up credentials."
            elif "permission" in error_msg.lower():
                error_msg = f"Permission denied: {error_msg}. Please verify you have the necessary BigQuery permissions."
    
            return {"error": {"code": "UNKNOWN_ERROR", "message": error_msg}}
  • Registers the bq_dry_run_sql tool in the MCP server's list of tools, including its description and input schema definition.
    types.Tool(
        name="bq_dry_run_sql",
        description=(
            "Perform a dry-run of a BigQuery SQL query to get cost " "estimates and metadata"
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "The SQL query to dry-run",
                },
                "params": {
                    "type": "object",
                    "description": ("Optional query parameters (key-value pairs)"),
                    "additionalProperties": True,
                },
                "pricePerTiB": {
                    "type": "number",
                    "description": (
                        "Price per TiB for cost estimation " "(defaults to env var or 5.0)"
                    ),
                },
            },
            "required": ["sql"],
        },
    ),
  • Defines the input schema for the bq_dry_run_sql tool, specifying parameters like sql (required), params, and pricePerTiB.
    inputSchema={
        "type": "object",
        "properties": {
            "sql": {
                "type": "string",
                "description": "The SQL query to dry-run",
            },
            "params": {
                "type": "object",
                "description": ("Optional query parameters (key-value pairs)"),
                "additionalProperties": True,
            },
            "pricePerTiB": {
                "type": "number",
                "description": (
                    "Price per TiB for cost estimation " "(defaults to env var or 5.0)"
                ),
            },
        },
        "required": ["sql"],
  • Dispatch handler in the main call_tool function that routes requests for bq_dry_run_sql to the dry_run_sql implementation.
    elif name == "bq_dry_run_sql":
        result = await dry_run_sql(
            sql=arguments["sql"],
            params=arguments.get("params"),
            price_per_tib=arguments.get("pricePerTiB"),
        )
        return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
  • Helper function used by dry_run_sql to convert input params dictionary into BigQuery ScalarQueryParameter list.
    def build_query_parameters(params: dict[str, Any] | None) -> list[bigquery.ScalarQueryParameter]:
        """
        Build BigQuery query parameters from a dictionary.
    
        Initial implementation treats all values as STRING type.
    
        Args:
            params: Dictionary of parameter names to values
    
        Returns:
            List of ScalarQueryParameter objects
        """
        if not params:
            return []
    
        return [
            bigquery.ScalarQueryParameter(name, "STRING", str(value)) for name, value in params.items()
        ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server