Skip to main content
Glama
caron14

BigQuery Validator

by caron14

bq_query_info_schema

Retrieve BigQuery metadata by querying INFORMATION_SCHEMA views to analyze tables, columns, storage, partitions, and other schema details for validation and planning purposes.

Instructions

Query INFORMATION_SCHEMA views for metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
query_typeYesType of query (tables, columns, table_storage, partitions, views, routines, custom)
dataset_idYesThe dataset to query metadata for
project_idNoGCP project ID (uses default if not provided)
table_filterNoOptional table name filter
custom_queryNoCustom INFORMATION_SCHEMA query (when query_type is 'custom')
limitNoMaximum number of results (default: 100)

Implementation Reference

  • Primary handler function that validates the tool input using Pydantic model and executes the INFORMATION_SCHEMA query dry-run via the implementation function.
    async def query_info_schema(
        query_type: str,
        dataset_id: str,
        project_id: str | None = None,
        table_filter: str | None = None,
        custom_query: str | None = None,
        limit: int | None = 100,
    ) -> dict[str, Any]:
        """Execute INFORMATION_SCHEMA queries using dry-run validation."""
        try:
            request = validate_request(
                QueryInfoSchemaRequest,
                {
                    "query_type": query_type,
                    "dataset_id": dataset_id,
                    "project_id": project_id,
                    "table_filter": table_filter,
                    "custom_query": custom_query,
                    "limit": limit,
                },
            )
        except MCPBigQueryError as exc:
            details = exc.details if isinstance(exc.details, dict) else {}
            message_lower = exc.message.lower()
            if exc.code == "INVALID_PARAMETER" and (
                details.get("parameter") == "query_type" or "query_type" in message_lower
            ):
                allowed = sorted(INFO_SCHEMA_TEMPLATES.keys()) + ["custom"]
                custom_exc = MCPBigQueryError(
                    f"Invalid query type '{query_type}'.",
                    code="INVALID_QUERY_TYPE",
                    details=[{"allowed": allowed}],
                )
                return {"error": format_error_response(custom_exc)}
    
            return {"error": format_error_response(exc)}
    
        try:
            return await _query_info_schema_impl(request)
        except MCPBigQueryError as exc:
            return {"error": format_error_response(exc)}
        except Exception as exc:  # pragma: no cover - defensive guard
            logger.exception("Unexpected error while querying INFORMATION_SCHEMA")
            wrapped = MCPBigQueryError(str(exc), code="INFO_SCHEMA_ERROR")
            return {"error": format_error_response(wrapped)}
  • Core implementation that constructs the INFORMATION_SCHEMA query and performs BigQuery dry-run to validate and get metadata.
    async def _query_info_schema_impl(request: QueryInfoSchemaRequest) -> dict[str, Any]:
        client = get_bigquery_client(project_id=request.project_id)
        project = request.project_id or client.project
        query = _build_query(request, project)
    
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    
        try:
            query_job = client.query(query, job_config=job_config)
        except BadRequest as exc:
            raise MCPBigQueryError(
                str(exc),
                code="QUERY_ERROR",
                details=[{"query": query}],
            ) from exc
    
        schema = []
        if query_job.schema:
            for field in query_job.schema:
                schema.append(
                    {
                        "name": field.name,
                        "type": field.field_type,
                        "mode": field.mode,
                        "description": field.description,
                    }
                )
    
        bytes_processed = query_job.total_bytes_processed or 0
        price_per_tib = get_config().price_per_tib
        bytes_per_tib = 1024**4
        estimated_cost_usd = (bytes_processed / bytes_per_tib) * price_per_tib
    
        result: dict[str, Any] = {
            "query_type": request.query_type,
            "dataset_id": request.dataset_id,
            "project": project,
            "query": query,
            "schema": schema,
            "metadata": {
                "total_bytes_processed": bytes_processed,
                "estimated_cost_usd": round(estimated_cost_usd, 6),
                "cache_hit": False,
            },
            "info": "Query validated successfully. Execute without dry_run to get actual results.",
        }
    
        if request.table_filter:
            result["table_filter"] = request.table_filter
    
        return result
  • Pydantic model providing input validation and type definitions for the bq_query_info_schema tool parameters.
    class QueryInfoSchemaRequest(BaseModel):
        """Request model for querying INFORMATION_SCHEMA."""
    
        query_type: str = Field(
            ..., pattern=r"^(tables|columns|table_storage|partitions|views|routines|custom)$"
        )
        dataset_id: str = Field(..., min_length=1, max_length=1024)
        project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN)
        table_filter: str | None = Field(None, max_length=1024)
        custom_query: str | None = Field(None, max_length=DEFAULT_LIMITS["max_query_length"])
        limit: int = Field(DEFAULT_LIMITS["info_schema_limit"], ge=1, le=10000)
    
        @model_validator(mode="after")
        def validate_custom_query(self) -> QueryInfoSchemaRequest:
            """Validate custom_query is provided when query_type is 'custom'."""
            if self.query_type == "custom" and not self.custom_query:
                raise ValueError("custom_query is required when query_type is 'custom'")
    
            if self.query_type != "custom" and self.custom_query:
                raise ValueError("custom_query should only be provided when query_type is 'custom'")
    
            return self
  • Tool registration in MCP server's list_tools() method, including name, description, and JSON schema matching the Pydantic model.
    types.Tool(
        name="bq_query_info_schema",
        description=("Query INFORMATION_SCHEMA views for metadata"),
        inputSchema={
            "type": "object",
            "properties": {
                "query_type": {
                    "type": "string",
                    "description": "Type of query (tables, columns, table_storage, partitions, views, routines, custom)",
                },
                "dataset_id": {
                    "type": "string",
                    "description": "The dataset to query metadata for",
                },
                "project_id": {
                    "type": "string",
                    "description": "GCP project ID (uses default if not provided)",
                },
                "table_filter": {
                    "type": "string",
                    "description": "Optional table name filter",
                },
                "custom_query": {
                    "type": "string",
                    "description": "Custom INFORMATION_SCHEMA query (when query_type is 'custom')",
                },
                "limit": {
                    "type": "integer",
                    "description": "Maximum number of results (default: 100)",
                },
            },
            "required": ["query_type", "dataset_id"],
        },
    ),
  • Dictionary of SQL templates for different INFORMATION_SCHEMA query types used by the handler to dynamically build queries.
    INFO_SCHEMA_TEMPLATES = {
        "tables": """
            SELECT
                table_catalog,
                table_schema,
                table_name,
                table_type,
                creation_time,
                ddl
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES`
            {where_clause}
            ORDER BY table_name
            {limit_clause}
        """,
        "columns": """
            SELECT
                table_catalog,
                table_schema,
                table_name,
                column_name,
                ordinal_position,
                is_nullable,
                data_type,
                is_partitioning_column,
                clustering_ordinal_position
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.COLUMNS`
            {where_clause}
            ORDER BY table_name, ordinal_position
            {limit_clause}
        """,
        "table_storage": """
            SELECT
                table_catalog,
                table_schema,
                table_name,
                creation_time,
                total_rows,
                total_partitions,
                total_logical_bytes,
                active_logical_bytes,
                long_term_logical_bytes,
                total_physical_bytes,
                active_physical_bytes,
                long_term_physical_bytes,
                time_travel_physical_bytes
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLE_STORAGE`
            {where_clause}
            ORDER BY total_logical_bytes DESC
            {limit_clause}
        """,
        "partitions": """
            SELECT
                table_catalog,
                table_schema,
                table_name,
                partition_id,
                total_rows,
                total_logical_bytes,
                total_physical_bytes,
                last_modified_time
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.PARTITIONS`
            {where_clause}
            ORDER BY table_name, partition_id
            {limit_clause}
        """,
        "views": """
            SELECT
                table_catalog,
                table_schema,
                table_name,
                view_definition,
                use_standard_sql
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.VIEWS`
            {where_clause}
            ORDER BY table_name
            {limit_clause}
        """,
        "routines": """
            SELECT
                routine_catalog,
                routine_schema,
                routine_name,
                routine_type,
                language,
                routine_definition,
                created,
                last_altered
            FROM `{project}.{dataset}.INFORMATION_SCHEMA.ROUTINES`
            {where_clause}
            ORDER BY routine_name
            {limit_clause}
        """,
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server