bq_query_info_schema
Retrieve BigQuery metadata by querying INFORMATION_SCHEMA views to analyze tables, columns, storage, partitions, and other schema details for validation and planning purposes.
Instructions
Query INFORMATION_SCHEMA views for metadata
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query_type | Yes | Type of query (tables, columns, table_storage, partitions, views, routines, custom) | |
| dataset_id | Yes | The dataset to query metadata for | |
| project_id | No | GCP project ID (uses default if not provided) | |
| table_filter | No | Optional table name filter | |
| custom_query | No | Custom INFORMATION_SCHEMA query (when query_type is 'custom') | |
| limit | No | Maximum number of results (default: 100) |
Implementation Reference
- Primary handler function that validates the tool input using Pydantic model and executes the INFORMATION_SCHEMA query dry-run via the implementation function.async def query_info_schema( query_type: str, dataset_id: str, project_id: str | None = None, table_filter: str | None = None, custom_query: str | None = None, limit: int | None = 100, ) -> dict[str, Any]: """Execute INFORMATION_SCHEMA queries using dry-run validation.""" try: request = validate_request( QueryInfoSchemaRequest, { "query_type": query_type, "dataset_id": dataset_id, "project_id": project_id, "table_filter": table_filter, "custom_query": custom_query, "limit": limit, }, ) except MCPBigQueryError as exc: details = exc.details if isinstance(exc.details, dict) else {} message_lower = exc.message.lower() if exc.code == "INVALID_PARAMETER" and ( details.get("parameter") == "query_type" or "query_type" in message_lower ): allowed = sorted(INFO_SCHEMA_TEMPLATES.keys()) + ["custom"] custom_exc = MCPBigQueryError( f"Invalid query type '{query_type}'.", code="INVALID_QUERY_TYPE", details=[{"allowed": allowed}], ) return {"error": format_error_response(custom_exc)} return {"error": format_error_response(exc)} try: return await _query_info_schema_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while querying INFORMATION_SCHEMA") wrapped = MCPBigQueryError(str(exc), code="INFO_SCHEMA_ERROR") return {"error": format_error_response(wrapped)}
- Core implementation that constructs the INFORMATION_SCHEMA query and performs BigQuery dry-run to validate and get metadata.async def _query_info_schema_impl(request: QueryInfoSchemaRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project query = _build_query(request, project) job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) try: query_job = client.query(query, job_config=job_config) except BadRequest as exc: raise MCPBigQueryError( str(exc), code="QUERY_ERROR", details=[{"query": query}], ) from exc schema = [] if query_job.schema: for field in query_job.schema: schema.append( { "name": field.name, "type": field.field_type, "mode": field.mode, "description": field.description, } ) bytes_processed = query_job.total_bytes_processed or 0 price_per_tib = get_config().price_per_tib bytes_per_tib = 1024**4 estimated_cost_usd = (bytes_processed / bytes_per_tib) * price_per_tib result: dict[str, Any] = { "query_type": request.query_type, "dataset_id": request.dataset_id, "project": project, "query": query, "schema": schema, "metadata": { "total_bytes_processed": bytes_processed, "estimated_cost_usd": round(estimated_cost_usd, 6), "cache_hit": False, }, "info": "Query validated successfully. Execute without dry_run to get actual results.", } if request.table_filter: result["table_filter"] = request.table_filter return result
- Pydantic model providing input validation and type definitions for the bq_query_info_schema tool parameters.class QueryInfoSchemaRequest(BaseModel): """Request model for querying INFORMATION_SCHEMA.""" query_type: str = Field( ..., pattern=r"^(tables|columns|table_storage|partitions|views|routines|custom)$" ) dataset_id: str = Field(..., min_length=1, max_length=1024) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN) table_filter: str | None = Field(None, max_length=1024) custom_query: str | None = Field(None, max_length=DEFAULT_LIMITS["max_query_length"]) limit: int = Field(DEFAULT_LIMITS["info_schema_limit"], ge=1, le=10000) @model_validator(mode="after") def validate_custom_query(self) -> QueryInfoSchemaRequest: """Validate custom_query is provided when query_type is 'custom'.""" if self.query_type == "custom" and not self.custom_query: raise ValueError("custom_query is required when query_type is 'custom'") if self.query_type != "custom" and self.custom_query: raise ValueError("custom_query should only be provided when query_type is 'custom'") return self
- src/mcp_bigquery/server.py:267-300 (registration)Tool registration in MCP server's list_tools() method, including name, description, and JSON schema matching the Pydantic model.types.Tool( name="bq_query_info_schema", description=("Query INFORMATION_SCHEMA views for metadata"), inputSchema={ "type": "object", "properties": { "query_type": { "type": "string", "description": "Type of query (tables, columns, table_storage, partitions, views, routines, custom)", }, "dataset_id": { "type": "string", "description": "The dataset to query metadata for", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, "table_filter": { "type": "string", "description": "Optional table name filter", }, "custom_query": { "type": "string", "description": "Custom INFORMATION_SCHEMA query (when query_type is 'custom')", }, "limit": { "type": "integer", "description": "Maximum number of results (default: 100)", }, }, "required": ["query_type", "dataset_id"], }, ),
- Dictionary of SQL templates for different INFORMATION_SCHEMA query types used by the handler to dynamically build queries.INFO_SCHEMA_TEMPLATES = { "tables": """ SELECT table_catalog, table_schema, table_name, table_type, creation_time, ddl FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` {where_clause} ORDER BY table_name {limit_clause} """, "columns": """ SELECT table_catalog, table_schema, table_name, column_name, ordinal_position, is_nullable, data_type, is_partitioning_column, clustering_ordinal_position FROM `{project}.{dataset}.INFORMATION_SCHEMA.COLUMNS` {where_clause} ORDER BY table_name, ordinal_position {limit_clause} """, "table_storage": """ SELECT table_catalog, table_schema, table_name, creation_time, total_rows, total_partitions, total_logical_bytes, active_logical_bytes, long_term_logical_bytes, total_physical_bytes, active_physical_bytes, long_term_physical_bytes, time_travel_physical_bytes FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLE_STORAGE` {where_clause} ORDER BY total_logical_bytes DESC {limit_clause} """, "partitions": """ SELECT table_catalog, table_schema, table_name, partition_id, total_rows, total_logical_bytes, total_physical_bytes, last_modified_time FROM `{project}.{dataset}.INFORMATION_SCHEMA.PARTITIONS` {where_clause} ORDER BY table_name, partition_id {limit_clause} """, "views": """ SELECT table_catalog, table_schema, table_name, view_definition, use_standard_sql FROM `{project}.{dataset}.INFORMATION_SCHEMA.VIEWS` {where_clause} ORDER BY table_name {limit_clause} """, "routines": """ SELECT routine_catalog, routine_schema, routine_name, routine_type, language, routine_definition, created, last_altered FROM `{project}.{dataset}.INFORMATION_SCHEMA.ROUTINES` {where_clause} ORDER BY routine_name {limit_clause} """, }