Skip to main content
Glama

bq_list_tables

Lists tables in a BigQuery dataset with metadata to validate queries and preview schemas before execution.

Instructions

List all tables in a BigQuery dataset with metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dataset_idYesThe dataset ID
project_idNoGCP project ID (uses default if not provided)
max_resultsNoMaximum number of tables to return
table_type_filterNoFilter by table types (TABLE, VIEW, EXTERNAL, MATERIALIZED_VIEW)

Implementation Reference

  • Core implementation of the `list_tables` handler: validates input, fetches tables from BigQuery dataset using client.list_tables, filters by type, enriches with metadata (partitioning, clustering, stats), and returns formatted JSON response.
    async def list_tables( dataset_id: str, project_id: str | None = None, max_results: int | None = None, table_type_filter: list[str] | None = None, ) -> dict[str, Any]: """List tables in a dataset.""" try: request = validate_request( ListTablesRequest, { "dataset_id": dataset_id, "project_id": project_id, "max_results": max_results, "table_type_filter": table_type_filter, }, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _list_tables_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while listing tables") wrapped = MCPBigQueryError(str(exc), code="LIST_TABLES_ERROR") return {"error": format_error_response(wrapped)} async def _list_tables_impl(request: ListTablesRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project try: list_kwargs: dict[str, Any] = {"dataset": f"{project}.{request.dataset_id}"} if request.max_results is not None: list_kwargs["max_results"] = request.max_results iterator = client.list_tables(**list_kwargs) except NotFound as exc: raise DatasetNotFoundError(request.dataset_id, project) from exc allowed_types = set(request.table_type_filter) if request.table_type_filter else None tables: list[dict[str, Any]] = [] for table in iterator: try: table_ref = client.get_table(table.reference) except NotFound as exc: raise TableNotFoundError(table.table_id, request.dataset_id, project) from exc table_type = table_ref.table_type if allowed_types and table_type not in allowed_types: continue partitioning = partitioning_overview(table_ref) clustering = clustering_fields(table_ref) table_info: dict[str, Any] = { "table_id": table.table_id, "dataset_id": table.dataset_id, "project": table.project, "table_type": table_type, "created": serialize_timestamp(table_ref.created), "modified": serialize_timestamp(table_ref.modified), "description": table_ref.description, "labels": table_ref.labels or {}, "num_bytes": getattr(table_ref, "num_bytes", None), "num_rows": getattr(table_ref, "num_rows", None), "location": table_ref.location, } if partitioning: table_info["partitioning"] = partitioning if clustering: table_info["clustering_fields"] = clustering tables.append(table_info) return { "dataset_id": request.dataset_id, "project": project, "table_count": len(tables), "tables": tables, }
  • MCP Tool schema definition for 'bq_list_tables', including input schema with properties for dataset_id (required), project_id, max_results, and table_type_filter.
    types.Tool( name="bq_list_tables", description=("List all tables in a BigQuery dataset with metadata"), inputSchema={ "type": "object", "properties": { "dataset_id": { "type": "string", "description": "The dataset ID", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, "max_results": { "type": "integer", "description": "Maximum number of tables to return", }, "table_type_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by table types (TABLE, VIEW, EXTERNAL, MATERIALIZED_VIEW)", }, }, "required": ["dataset_id"], }, ),
  • Registration and dispatch logic in the @server.call_tool() handler that maps 'bq_list_tables' calls to the list_tables implementation.
    elif name == "bq_list_tables": result = await list_tables( dataset_id=arguments["dataset_id"], project_id=arguments.get("project_id"), max_results=arguments.get("max_results"), table_type_filter=arguments.get("table_type_filter"), ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
  • Pydantic validation model ListTablesRequest used by the handler for input validation, with field constraints and table_type_filter validator.
    class ListTablesRequest(BaseModel): """Request model for listing tables.""" dataset_id: str = Field(..., min_length=1, max_length=1024) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN) max_results: int | None = Field(None, ge=1, le=10000) table_type_filter: list[str] | None = Field(None) @field_validator("table_type_filter") @classmethod def validate_table_types(cls, v: list[str] | None) -> list[str] | None: """Validate table types are valid.""" if v is None: return v valid_types = {t.value for t in TableType} for table_type in v: if table_type not in valid_types: raise ValueError(f"Invalid table type: {table_type}. Must be one of {valid_types}") return v

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server