bq_get_table_info
Retrieve table metadata including partitioning and clustering details for BigQuery tables to validate schema and optimize query performance.
Instructions
Get comprehensive table information including partitioning and clustering
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| table_id | Yes | The table ID | |
| dataset_id | Yes | The dataset ID | |
| project_id | No | GCP project ID (uses default if not provided) |
Implementation Reference
- The handler function `get_table_info` that implements the core logic for the `bq_get_table_info` tool. It validates the input parameters using `GetTableInfoRequest` Pydantic model, handles errors, and calls `_get_table_info_impl` to fetch and format comprehensive table metadata from BigQuery including schema count, statistics, partitioning, clustering, constraints, encryption, and type-specific details.async def get_table_info( table_id: str, dataset_id: str, project_id: str | None = None, ) -> dict[str, Any]: """Return comprehensive metadata for a table.""" try: request = validate_request( GetTableInfoRequest, {"table_id": table_id, "dataset_id": dataset_id, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _get_table_info_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while fetching table info") wrapped = MCPBigQueryError(str(exc), code="GET_TABLE_INFO_ERROR") return {"error": format_error_response(wrapped)} async def _get_table_info_impl(request: GetTableInfoRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project try: table = client.get_table(f"{project}.{request.dataset_id}.{request.table_id}") except NotFound as exc: raise TableNotFoundError(request.table_id, request.dataset_id, project) from exc info: dict[str, Any] = { "table_id": request.table_id, "dataset_id": request.dataset_id, "project": project, "full_table_id": f"{project}.{request.dataset_id}.{request.table_id}", "table_type": table.table_type, "created": serialize_timestamp(table.created), "modified": serialize_timestamp(table.modified), "expires": serialize_timestamp(getattr(table, "expires", None)), "description": table.description, "labels": table.labels or {}, "location": table.location, "self_link": getattr(table, "self_link", None), "etag": getattr(table, "etag", None), "encryption_configuration": ( {"kms_key_name": table.encryption_configuration.kms_key_name} if getattr(table, "encryption_configuration", None) else None ), "friendly_name": getattr(table, "friendly_name", None), "statistics": table_statistics(table), "schema_field_count": len(table.schema) if table.schema else 0, } if table.table_type == "TABLE": info["time_travel"] = { "max_time_travel_hours": getattr(table, "max_time_travel_hours", 168), } if table.table_type == "VIEW": info["view"] = { "query": getattr(table, "view_query", None), "use_legacy_sql": getattr(table, "view_use_legacy_sql", None), } materialized = materialized_view_info(table) if materialized: info["materialized_view"] = materialized external = external_table_info(table) if external: info["external"] = external streaming = streaming_buffer_info(table) if streaming: info["streaming_buffer"] = streaming partitioning = partitioning_details(table) if partitioning: info["partitioning"] = partitioning clustering = clustering_fields(table) if clustering: info["clustering"] = {"fields": clustering} if getattr(table, "table_constraints", None): constraints = table.table_constraints info["table_constraints"] = { "primary_key": (constraints.primary_key.columns if constraints.primary_key else None), "foreign_keys": ( [ { "name": fk.name, "referenced_table": fk.referenced_table.table_id, "column_references": fk.column_references, } for fk in constraints.foreign_keys ] if constraints.foreign_keys else [] ), } return info
- src/mcp_bigquery/server.py:243-266 (registration)MCP tool registration for `bq_get_table_info` in the `handle_list_tools` function, defining the tool name, description, and JSON schema for input validation.types.Tool( name="bq_get_table_info", description=( "Get comprehensive table information including partitioning and clustering" ), inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The table ID", }, "dataset_id": { "type": "string", "description": "The dataset ID", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, }, "required": ["table_id", "dataset_id"], }, ),
- Pydantic `BaseModel` for input validation (`GetTableInfoRequest`) used within the handler to validate table_id, dataset_id, and project_id parameters with length and pattern constraints.class GetTableInfoRequest(BaseModel): """Request model for getting table info.""" table_id: str = Field(..., min_length=1, max_length=1024) dataset_id: str = Field(..., min_length=1, max_length=1024) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN)
- src/mcp_bigquery/server.py:378-384 (handler)Dispatch handler in MCP server's `handle_call_tool` that routes `bq_get_table_info` calls to the `get_table_info` implementation and formats the response as JSON text content.elif name == "bq_get_table_info": result = await get_table_info( table_id=arguments["table_id"], dataset_id=arguments["dataset_id"], project_id=arguments.get("project_id"), ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]