Skip to main content
Glama
tables.py7.74 kB
"""Table exploration helpers.""" from __future__ import annotations from typing import Any from google.cloud.exceptions import NotFound from ..bigquery_client import get_bigquery_client from ..exceptions import DatasetNotFoundError, MCPBigQueryError, TableNotFoundError from ..logging_config import get_logger from ..utils import format_error_response from ..validators import GetTableInfoRequest, ListTablesRequest, validate_request from ._formatters import ( clustering_fields, external_table_info, materialized_view_info, partitioning_details, partitioning_overview, serialize_timestamp, streaming_buffer_info, table_statistics, ) logger = get_logger(__name__) async def list_tables( dataset_id: str, project_id: str | None = None, max_results: int | None = None, table_type_filter: list[str] | None = None, ) -> dict[str, Any]: """List tables in a dataset.""" try: request = validate_request( ListTablesRequest, { "dataset_id": dataset_id, "project_id": project_id, "max_results": max_results, "table_type_filter": table_type_filter, }, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _list_tables_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while listing tables") wrapped = MCPBigQueryError(str(exc), code="LIST_TABLES_ERROR") return {"error": format_error_response(wrapped)} async def _list_tables_impl(request: ListTablesRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project try: list_kwargs: dict[str, Any] = {"dataset": f"{project}.{request.dataset_id}"} if request.max_results is not None: list_kwargs["max_results"] = request.max_results iterator = client.list_tables(**list_kwargs) except NotFound as exc: raise DatasetNotFoundError(request.dataset_id, project) from exc allowed_types = set(request.table_type_filter) if request.table_type_filter else None tables: list[dict[str, Any]] = [] for table in iterator: try: table_ref = client.get_table(table.reference) except NotFound as exc: raise TableNotFoundError(table.table_id, request.dataset_id, project) from exc table_type = table_ref.table_type if allowed_types and table_type not in allowed_types: continue partitioning = partitioning_overview(table_ref) clustering = clustering_fields(table_ref) table_info: dict[str, Any] = { "table_id": table.table_id, "dataset_id": table.dataset_id, "project": table.project, "table_type": table_type, "created": serialize_timestamp(table_ref.created), "modified": serialize_timestamp(table_ref.modified), "description": table_ref.description, "labels": table_ref.labels or {}, "num_bytes": getattr(table_ref, "num_bytes", None), "num_rows": getattr(table_ref, "num_rows", None), "location": table_ref.location, } if partitioning: table_info["partitioning"] = partitioning if clustering: table_info["clustering_fields"] = clustering tables.append(table_info) return { "dataset_id": request.dataset_id, "project": project, "table_count": len(tables), "tables": tables, } async def get_table_info( table_id: str, dataset_id: str, project_id: str | None = None, ) -> dict[str, Any]: """Return comprehensive metadata for a table.""" try: request = validate_request( GetTableInfoRequest, {"table_id": table_id, "dataset_id": dataset_id, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _get_table_info_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error while fetching table info") wrapped = MCPBigQueryError(str(exc), code="GET_TABLE_INFO_ERROR") return {"error": format_error_response(wrapped)} async def _get_table_info_impl(request: GetTableInfoRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project try: table = client.get_table(f"{project}.{request.dataset_id}.{request.table_id}") except NotFound as exc: raise TableNotFoundError(request.table_id, request.dataset_id, project) from exc info: dict[str, Any] = { "table_id": request.table_id, "dataset_id": request.dataset_id, "project": project, "full_table_id": f"{project}.{request.dataset_id}.{request.table_id}", "table_type": table.table_type, "created": serialize_timestamp(table.created), "modified": serialize_timestamp(table.modified), "expires": serialize_timestamp(getattr(table, "expires", None)), "description": table.description, "labels": table.labels or {}, "location": table.location, "self_link": getattr(table, "self_link", None), "etag": getattr(table, "etag", None), "encryption_configuration": ( {"kms_key_name": table.encryption_configuration.kms_key_name} if getattr(table, "encryption_configuration", None) else None ), "friendly_name": getattr(table, "friendly_name", None), "statistics": table_statistics(table), "schema_field_count": len(table.schema) if table.schema else 0, } if table.table_type == "TABLE": info["time_travel"] = { "max_time_travel_hours": getattr(table, "max_time_travel_hours", 168), } if table.table_type == "VIEW": info["view"] = { "query": getattr(table, "view_query", None), "use_legacy_sql": getattr(table, "view_use_legacy_sql", None), } materialized = materialized_view_info(table) if materialized: info["materialized_view"] = materialized external = external_table_info(table) if external: info["external"] = external streaming = streaming_buffer_info(table) if streaming: info["streaming_buffer"] = streaming partitioning = partitioning_details(table) if partitioning: info["partitioning"] = partitioning clustering = clustering_fields(table) if clustering: info["clustering"] = {"fields": clustering} if getattr(table, "table_constraints", None): constraints = table.table_constraints info["table_constraints"] = { "primary_key": (constraints.primary_key.columns if constraints.primary_key else None), "foreign_keys": ( [ { "name": fk.name, "referenced_table": fk.referenced_table.table_id, "column_references": fk.column_references, } for fk in constraints.foreign_keys ] if constraints.foreign_keys else [] ), } return info

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server