Skip to main content
Glama
cloud_bigquery.py24.5 kB
import json from typing import Any, Dict, List, Optional from google.cloud import bigquery from google.cloud.exceptions import NotFound from services import client_instances def register(mcp_instance): """Register all BigQuery resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://bigquery/{project_id}/datasets") def list_datasets_resource(project_id: str = None) -> str: """List all BigQuery datasets in a project""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() datasets = list(client.list_datasets()) result = [] for dataset in datasets: result.append( { "id": dataset.dataset_id, "full_id": dataset.full_dataset_id, "friendly_name": dataset.friendly_name, "location": dataset.location, "labels": dict(dataset.labels) if dataset.labels else {}, "created": dataset.created.isoformat() if dataset.created else None, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}") def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str: """Get details for a specific BigQuery dataset""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() dataset_ref = client.dataset(dataset_id) dataset = client.get_dataset(dataset_ref) result = { "id": dataset.dataset_id, "full_id": dataset.full_dataset_id, "friendly_name": dataset.friendly_name, "description": dataset.description, "location": dataset.location, "labels": dict(dataset.labels) if dataset.labels else {}, "created": dataset.created.isoformat() if dataset.created else None, "modified": dataset.modified.isoformat() if dataset.modified else None, "default_table_expiration_ms": dataset.default_table_expiration_ms, "default_partition_expiration_ms": dataset.default_partition_expiration_ms, } return json.dumps(result, indent=2) except NotFound: return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables") def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str: """List all tables in a BigQuery dataset""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() tables = list(client.list_tables(dataset_id)) result = [] for table in tables: result.append( { "id": table.table_id, "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "table_type": table.table_type, } ) return json.dumps(result, indent=2) except NotFound: return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource( "gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}" ) def get_table_resource( project_id: str = None, dataset_id: str = None, table_id: str = None ) -> str: """Get details for a specific BigQuery table""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) # Extract schema information schema_fields = [] for field in table.schema: schema_fields.append( { "name": field.name, "type": field.field_type, "mode": field.mode, "description": field.description, } ) result = { "id": table.table_id, "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "friendly_name": table.friendly_name, "description": table.description, "num_rows": table.num_rows, "num_bytes": table.num_bytes, "table_type": table.table_type, "created": table.created.isoformat() if table.created else None, "modified": table.modified.isoformat() if table.modified else None, "expires": table.expires.isoformat() if table.expires else None, "schema": schema_fields, "labels": dict(table.labels) if table.labels else {}, } return json.dumps(result, indent=2) except NotFound: return json.dumps( {"error": f"Table {dataset_id}.{table_id} not found"}, indent=2 ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def run_query( query: str, project_id: str = None, location: str = None, max_results: int = 100, use_legacy_sql: bool = False, timeout_ms: int = 30000, ) -> str: """ Run a BigQuery query and return the results Args: query: SQL query to execute project_id: GCP project ID (defaults to configured project) location: Optional BigQuery location (us, eu, etc.) max_results: Maximum number of rows to return use_legacy_sql: Whether to use legacy SQL syntax timeout_ms: Query timeout in milliseconds """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() job_config = bigquery.QueryJobConfig( use_legacy_sql=use_legacy_sql, ) # Log info (similar to ctx.info) print(f"Running query: {query[:100]}...") query_job = client.query( query, job_config=job_config, location=location, timeout=timeout_ms / 1000.0, ) # Wait for the query to complete results = query_job.result(max_results=max_results) # Get the schema schema = [field.name for field in results.schema] # Convert rows to a list of dictionaries rows = [] for row in results: row_dict = {} for key in schema: value = row[key] if hasattr(value, "isoformat"): # Handle datetime objects row_dict[key] = value.isoformat() else: row_dict[key] = value rows.append(row_dict) # Create summary statistics stats = { "total_rows": query_job.total_rows, "total_bytes_processed": query_job.total_bytes_processed, "total_bytes_billed": query_job.total_bytes_billed, "billing_tier": query_job.billing_tier, "created": query_job.created.isoformat() if query_job.created else None, "started": query_job.started.isoformat() if query_job.started else None, "ended": query_job.ended.isoformat() if query_job.ended else None, "duration_ms": (query_job.ended - query_job.started).total_seconds() * 1000 if query_job.started and query_job.ended else None, } return json.dumps( { "status": "success", "schema": schema, "rows": rows, "returned_rows": len(rows), "stats": stats, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_dataset( dataset_id: str, project_id: str = None, location: str = None, description: str = "", friendly_name: str = None, labels: Optional[Dict[str, str]] = None, default_table_expiration_ms: Optional[int] = None, ) -> str: """ Create a new BigQuery dataset Args: dataset_id: ID for the new dataset project_id: GCP project ID (defaults to configured project) location: Dataset location (US, EU, asia-northeast1, etc.) description: Optional dataset description friendly_name: Optional user-friendly name labels: Optional key-value pairs for dataset labels default_table_expiration_ms: Default expiration time for tables in milliseconds """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") dataset.location = location if description: dataset.description = description if friendly_name: dataset.friendly_name = friendly_name if labels: dataset.labels = labels if default_table_expiration_ms: dataset.default_table_expiration_ms = default_table_expiration_ms # Log info (similar to ctx.info) print(f"Creating dataset {dataset_id} in {location}...") dataset = client.create_dataset(dataset) return json.dumps( { "status": "success", "dataset_id": dataset.dataset_id, "full_dataset_id": dataset.full_dataset_id, "location": dataset.location, "created": dataset.created.isoformat() if dataset.created else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_table( dataset_id: str, table_id: str, schema_fields: List[Dict[str, Any]], project_id: str = None, description: str = "", friendly_name: str = None, expiration_ms: Optional[int] = None, labels: Optional[Dict[str, str]] = None, clustering_fields: Optional[List[str]] = None, time_partitioning_field: Optional[str] = None, time_partitioning_type: str = "DAY", ) -> str: """ Create a new BigQuery table Args: dataset_id: Dataset ID where the table will be created table_id: ID for the new table schema_fields: List of field definitions, each with name, type, mode, and description project_id: GCP project ID (defaults to configured project) description: Optional table description friendly_name: Optional user-friendly name expiration_ms: Optional table expiration time in milliseconds labels: Optional key-value pairs for table labels clustering_fields: Optional list of fields to cluster by time_partitioning_field: Optional field to use for time-based partitioning time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR) Example schema_fields: [ {"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"}, {"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"} ] """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() # Convert schema_fields to SchemaField objects schema = [] for field in schema_fields: schema.append( bigquery.SchemaField( name=field["name"], field_type=field["type"], mode=field.get("mode", "NULLABLE"), description=field.get("description", ""), ) ) # Create table reference table_ref = client.dataset(dataset_id).table(table_id) table = bigquery.Table(table_ref, schema=schema) # Set table properties if description: table.description = description if friendly_name: table.friendly_name = friendly_name if expiration_ms: table.expires = expiration_ms if labels: table.labels = labels # Set clustering if specified if clustering_fields: table.clustering_fields = clustering_fields # Set time partitioning if specified if time_partitioning_field: if time_partitioning_type == "DAY": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field=time_partitioning_field, ) elif time_partitioning_type == "HOUR": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.HOUR, field=time_partitioning_field, ) elif time_partitioning_type == "MONTH": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.MONTH, field=time_partitioning_field, ) elif time_partitioning_type == "YEAR": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.YEAR, field=time_partitioning_field, ) # Log info (similar to ctx.info) print(f"Creating table {dataset_id}.{table_id}...") table = client.create_table(table) return json.dumps( { "status": "success", "table_id": table.table_id, "full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "created": table.created.isoformat() if table.created else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str: """ Delete a BigQuery table Args: dataset_id: Dataset ID containing the table table_id: ID of the table to delete project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Log info (similar to ctx.info) print(f"Deleting table {dataset_id}.{table_id}...") client.delete_table(table_ref) return json.dumps( { "status": "success", "message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted", }, indent=2, ) except NotFound: return json.dumps( { "status": "error", "message": f"Table {project_id}.{dataset_id}.{table_id} not found", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def load_table_from_json( dataset_id: str, table_id: str, json_data: List[Dict[str, Any]], project_id: str = None, schema_fields: Optional[List[Dict[str, Any]]] = None, write_disposition: str = "WRITE_APPEND", ) -> str: """ Load data into a BigQuery table from JSON data Args: dataset_id: Dataset ID containing the table table_id: ID of the table to load data into json_data: List of dictionaries representing rows to insert project_id: GCP project ID (defaults to configured project) schema_fields: Optional schema definition (if not using existing table schema) write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY) """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Convert write_disposition to the appropriate enum if write_disposition == "WRITE_TRUNCATE": disposition = bigquery.WriteDisposition.WRITE_TRUNCATE elif write_disposition == "WRITE_APPEND": disposition = bigquery.WriteDisposition.WRITE_APPEND elif write_disposition == "WRITE_EMPTY": disposition = bigquery.WriteDisposition.WRITE_EMPTY else: return json.dumps( { "status": "error", "message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.", }, indent=2, ) # Create job config job_config = bigquery.LoadJobConfig( write_disposition=disposition, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, ) # Set schema if provided if schema_fields: schema = [] for field in schema_fields: schema.append( bigquery.SchemaField( name=field["name"], field_type=field["type"], mode=field.get("mode", "NULLABLE"), description=field.get("description", ""), ) ) job_config.schema = schema # Convert JSON data to newline-delimited JSON (not needed but keeping the log) print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...") # Create and run the load job load_job = client.load_table_from_json( json_data, table_ref, job_config=job_config ) # Wait for the job to complete load_job.result() # Get updated table info table = client.get_table(table_ref) return json.dumps( { "status": "success", "rows_loaded": len(json_data), "total_rows": table.num_rows, "message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def export_table_to_csv( dataset_id: str, table_id: str, destination_uri: str, project_id: str = None, print_header: bool = True, field_delimiter: str = ",", ) -> str: """ Export a BigQuery table to Cloud Storage as CSV Args: dataset_id: Dataset ID containing the table table_id: ID of the table to export destination_uri: GCS URI (gs://bucket/path) project_id: GCP project ID (defaults to configured project) print_header: Whether to include column headers field_delimiter: Delimiter character for fields """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Validate destination URI if not destination_uri.startswith("gs://"): return json.dumps( { "status": "error", "message": "destination_uri must start with gs://", }, indent=2, ) job_config = bigquery.ExtractJobConfig() job_config.destination_format = bigquery.DestinationFormat.CSV job_config.print_header = print_header job_config.field_delimiter = field_delimiter # Log info (similar to ctx.info) print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...") # Create and run the extract job extract_job = client.extract_table( table_ref, destination_uri, job_config=job_config ) # Wait for the job to complete extract_job.result() return json.dumps( { "status": "success", "destination": destination_uri, "message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def create_dataset_prompt() -> str: """Prompt for creating a new BigQuery dataset""" return """ I need to create a new BigQuery dataset. Please help me with: 1. Choosing an appropriate location for my dataset 2. Understanding dataset naming conventions 3. Best practices for dataset configuration (expiration, labels, etc.) 4. The process to create the dataset """ @mcp_instance.prompt() def query_optimization_prompt() -> str: """Prompt for BigQuery query optimization help""" return """ I have a BigQuery query that's slow or expensive to run. Please help me optimize it by: 1. Analyzing key factors that affect BigQuery performance 2. Identifying common patterns that lead to inefficient queries 3. Suggesting specific optimization techniques 4. Helping me understand how to use EXPLAIN plan analysis """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/enesbol/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server