Skip to main content
Glama

bq_get_table_info

Retrieve table metadata including partitioning and clustering details for BigQuery tables to validate schema and optimize query performance.

Instructions

Get comprehensive table information including partitioning and clustering

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
table_idYesThe table ID
dataset_idYesThe dataset ID
project_idNoGCP project ID (uses default if not provided)

Implementation Reference

  • The handler function `get_table_info` that implements the core logic for the `bq_get_table_info` tool. It validates the input parameters using `GetTableInfoRequest` Pydantic model, handles errors, and calls `_get_table_info_impl` to fetch and format comprehensive table metadata from BigQuery including schema count, statistics, partitioning, clustering, constraints, encryption, and type-specific details.
    async def get_table_info( table_id: str, dataset_id: str, project_id: str | None = None, ) -> dict[str, Any]: """Return comprehensive metadata for a table.""" try: request = validate_request( GetTableInfoRequest, {"table_id": table_id, "dataset_id": dataset_id, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _get_table_info_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while fetching table info") wrapped = MCPBigQueryError(str(exc), code="GET_TABLE_INFO_ERROR") return {"error": format_error_response(wrapped)} async def _get_table_info_impl(request: GetTableInfoRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project try: table = client.get_table(f"{project}.{request.dataset_id}.{request.table_id}") except NotFound as exc: raise TableNotFoundError(request.table_id, request.dataset_id, project) from exc info: dict[str, Any] = { "table_id": request.table_id, "dataset_id": request.dataset_id, "project": project, "full_table_id": f"{project}.{request.dataset_id}.{request.table_id}", "table_type": table.table_type, "created": serialize_timestamp(table.created), "modified": serialize_timestamp(table.modified), "expires": serialize_timestamp(getattr(table, "expires", None)), "description": table.description, "labels": table.labels or {}, "location": table.location, "self_link": getattr(table, "self_link", None), "etag": getattr(table, "etag", None), "encryption_configuration": ( {"kms_key_name": table.encryption_configuration.kms_key_name} if getattr(table, "encryption_configuration", None) else None ), "friendly_name": getattr(table, "friendly_name", None), "statistics": table_statistics(table), "schema_field_count": len(table.schema) if table.schema else 0, } if table.table_type == "TABLE": info["time_travel"] = { "max_time_travel_hours": getattr(table, "max_time_travel_hours", 168), } if table.table_type == "VIEW": info["view"] = { "query": getattr(table, "view_query", None), "use_legacy_sql": getattr(table, "view_use_legacy_sql", None), } materialized = materialized_view_info(table) if materialized: info["materialized_view"] = materialized external = external_table_info(table) if external: info["external"] = external streaming = streaming_buffer_info(table) if streaming: info["streaming_buffer"] = streaming partitioning = partitioning_details(table) if partitioning: info["partitioning"] = partitioning clustering = clustering_fields(table) if clustering: info["clustering"] = {"fields": clustering} if getattr(table, "table_constraints", None): constraints = table.table_constraints info["table_constraints"] = { "primary_key": (constraints.primary_key.columns if constraints.primary_key else None), "foreign_keys": ( [ { "name": fk.name, "referenced_table": fk.referenced_table.table_id, "column_references": fk.column_references, } for fk in constraints.foreign_keys ] if constraints.foreign_keys else [] ), } return info
  • MCP tool registration for `bq_get_table_info` in the `handle_list_tools` function, defining the tool name, description, and JSON schema for input validation.
    types.Tool( name="bq_get_table_info", description=( "Get comprehensive table information including partitioning and clustering" ), inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The table ID", }, "dataset_id": { "type": "string", "description": "The dataset ID", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, }, "required": ["table_id", "dataset_id"], }, ),
  • Pydantic `BaseModel` for input validation (`GetTableInfoRequest`) used within the handler to validate table_id, dataset_id, and project_id parameters with length and pattern constraints.
    class GetTableInfoRequest(BaseModel): """Request model for getting table info.""" table_id: str = Field(..., min_length=1, max_length=1024) dataset_id: str = Field(..., min_length=1, max_length=1024) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN)
  • Dispatch handler in MCP server's `handle_call_tool` that routes `bq_get_table_info` calls to the `get_table_info` implementation and formats the response as JSON text content.
    elif name == "bq_get_table_info": result = await get_table_info( table_id=arguments["table_id"], dataset_id=arguments["dataset_id"], project_id=arguments.get("project_id"), ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server