Skip to main content
Glama
caron14

BigQuery Validator

by caron14

bq_list_tables

Lists tables in a BigQuery dataset with metadata to validate queries and preview schemas before execution.

Instructions

List all tables in a BigQuery dataset with metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dataset_idYesThe dataset ID
project_idNoGCP project ID (uses default if not provided)
max_resultsNoMaximum number of tables to return
table_type_filterNoFilter by table types (TABLE, VIEW, EXTERNAL, MATERIALIZED_VIEW)

Implementation Reference

  • Core implementation of the `list_tables` handler: validates input, fetches tables from BigQuery dataset using client.list_tables, filters by type, enriches with metadata (partitioning, clustering, stats), and returns formatted JSON response.
    async def list_tables(
        dataset_id: str,
        project_id: str | None = None,
        max_results: int | None = None,
        table_type_filter: list[str] | None = None,
    ) -> dict[str, Any]:
        """List tables in a dataset."""
        try:
            request = validate_request(
                ListTablesRequest,
                {
                    "dataset_id": dataset_id,
                    "project_id": project_id,
                    "max_results": max_results,
                    "table_type_filter": table_type_filter,
                },
            )
        except MCPBigQueryError as exc:
            return {"error": format_error_response(exc)}
    
        try:
            return await _list_tables_impl(request)
        except MCPBigQueryError as exc:
            return {"error": format_error_response(exc)}
        except Exception as exc:  # pragma: no cover - defensive guard
            logger.exception("Unexpected error while listing tables")
            wrapped = MCPBigQueryError(str(exc), code="LIST_TABLES_ERROR")
            return {"error": format_error_response(wrapped)}
    
    
    async def _list_tables_impl(request: ListTablesRequest) -> dict[str, Any]:
        client = get_bigquery_client(project_id=request.project_id)
        project = request.project_id or client.project
    
        try:
            list_kwargs: dict[str, Any] = {"dataset": f"{project}.{request.dataset_id}"}
            if request.max_results is not None:
                list_kwargs["max_results"] = request.max_results
    
            iterator = client.list_tables(**list_kwargs)
        except NotFound as exc:
            raise DatasetNotFoundError(request.dataset_id, project) from exc
    
        allowed_types = set(request.table_type_filter) if request.table_type_filter else None
        tables: list[dict[str, Any]] = []
    
        for table in iterator:
            try:
                table_ref = client.get_table(table.reference)
            except NotFound as exc:
                raise TableNotFoundError(table.table_id, request.dataset_id, project) from exc
    
            table_type = table_ref.table_type
            if allowed_types and table_type not in allowed_types:
                continue
    
            partitioning = partitioning_overview(table_ref)
            clustering = clustering_fields(table_ref)
    
            table_info: dict[str, Any] = {
                "table_id": table.table_id,
                "dataset_id": table.dataset_id,
                "project": table.project,
                "table_type": table_type,
                "created": serialize_timestamp(table_ref.created),
                "modified": serialize_timestamp(table_ref.modified),
                "description": table_ref.description,
                "labels": table_ref.labels or {},
                "num_bytes": getattr(table_ref, "num_bytes", None),
                "num_rows": getattr(table_ref, "num_rows", None),
                "location": table_ref.location,
            }
    
            if partitioning:
                table_info["partitioning"] = partitioning
            if clustering:
                table_info["clustering_fields"] = clustering
    
            tables.append(table_info)
    
        return {
            "dataset_id": request.dataset_id,
            "project": project,
            "table_count": len(tables),
            "tables": tables,
        }
  • MCP Tool schema definition for 'bq_list_tables', including input schema with properties for dataset_id (required), project_id, max_results, and table_type_filter.
    types.Tool(
        name="bq_list_tables",
        description=("List all tables in a BigQuery dataset with metadata"),
        inputSchema={
            "type": "object",
            "properties": {
                "dataset_id": {
                    "type": "string",
                    "description": "The dataset ID",
                },
                "project_id": {
                    "type": "string",
                    "description": "GCP project ID (uses default if not provided)",
                },
                "max_results": {
                    "type": "integer",
                    "description": "Maximum number of tables to return",
                },
                "table_type_filter": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Filter by table types (TABLE, VIEW, EXTERNAL, MATERIALIZED_VIEW)",
                },
            },
            "required": ["dataset_id"],
        },
    ),
  • Registration and dispatch logic in the @server.call_tool() handler that maps 'bq_list_tables' calls to the list_tables implementation.
    elif name == "bq_list_tables":
        result = await list_tables(
            dataset_id=arguments["dataset_id"],
            project_id=arguments.get("project_id"),
            max_results=arguments.get("max_results"),
            table_type_filter=arguments.get("table_type_filter"),
        )
        return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
  • Pydantic validation model ListTablesRequest used by the handler for input validation, with field constraints and table_type_filter validator.
    class ListTablesRequest(BaseModel):
        """Request model for listing tables."""
    
        dataset_id: str = Field(..., min_length=1, max_length=1024)
        project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN)
        max_results: int | None = Field(None, ge=1, le=10000)
        table_type_filter: list[str] | None = Field(None)
    
        @field_validator("table_type_filter")
        @classmethod
        def validate_table_types(cls, v: list[str] | None) -> list[str] | None:
            """Validate table types are valid."""
            if v is None:
                return v
    
            valid_types = {t.value for t in TableType}
            for table_type in v:
                if table_type not in valid_types:
                    raise ValueError(f"Invalid table type: {table_type}. Must be one of {valid_types}")
    
            return v

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server