Skip to main content
Glama

BigQuery Validator

by caron14
info_schema.pyβ€’13.5 kB
"""BigQuery INFORMATION_SCHEMA and performance analysis utilities.""" import json import os from typing import Any, Optional from google.cloud import bigquery from google.cloud.exceptions import BadRequest from .bigquery_client import get_bigquery_client # Common INFORMATION_SCHEMA queries INFO_SCHEMA_TEMPLATES = { "tables": """ SELECT table_catalog, table_schema, table_name, table_type, creation_time, ddl FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` {where_clause} ORDER BY table_name {limit_clause} """, "columns": """ SELECT table_catalog, table_schema, table_name, column_name, ordinal_position, is_nullable, data_type, is_partitioning_column, clustering_ordinal_position FROM `{project}.{dataset}.INFORMATION_SCHEMA.COLUMNS` {where_clause} ORDER BY table_name, ordinal_position {limit_clause} """, "table_storage": """ SELECT table_catalog, table_schema, table_name, creation_time, total_rows, total_partitions, total_logical_bytes, active_logical_bytes, long_term_logical_bytes, total_physical_bytes, active_physical_bytes, long_term_physical_bytes, time_travel_physical_bytes FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLE_STORAGE` {where_clause} ORDER BY total_logical_bytes DESC {limit_clause} """, "partitions": """ SELECT table_catalog, table_schema, table_name, partition_id, total_rows, total_logical_bytes, total_physical_bytes, last_modified_time FROM `{project}.{dataset}.INFORMATION_SCHEMA.PARTITIONS` {where_clause} ORDER BY table_name, partition_id {limit_clause} """, "views": """ SELECT table_catalog, table_schema, table_name, view_definition, use_standard_sql FROM `{project}.{dataset}.INFORMATION_SCHEMA.VIEWS` {where_clause} ORDER BY table_name {limit_clause} """, "routines": """ SELECT routine_catalog, routine_schema, routine_name, routine_type, language, routine_definition, created, last_altered FROM `{project}.{dataset}.INFORMATION_SCHEMA.ROUTINES` {where_clause} ORDER BY routine_name {limit_clause} """, } async def query_info_schema( query_type: str, dataset_id: str, project_id: Optional[str] = None, table_filter: Optional[str] = None, custom_query: Optional[str] = None, limit: Optional[int] = 100, ) -> dict[str, Any]: """ Execute INFORMATION_SCHEMA queries in dry-run mode. Args: query_type: Type of query (tables, columns, table_storage, partitions, views, routines, custom) dataset_id: The dataset to query metadata for project_id: GCP project ID (uses default if not provided) table_filter: Optional table name filter custom_query: Custom INFORMATION_SCHEMA query (when query_type is 'custom') limit: Maximum number of results Returns: Dict with query results or error information """ try: client = get_bigquery_client() project = project_id or client.project # Build the query if query_type == "custom" and custom_query: # Use custom query directly query = custom_query elif query_type in INFO_SCHEMA_TEMPLATES: # Build query from template where_clause = "" if table_filter: where_clause = f"WHERE table_name = '{table_filter}'" limit_clause = "" if limit: limit_clause = f"LIMIT {limit}" query = INFO_SCHEMA_TEMPLATES[query_type].format( project=project, dataset=dataset_id, where_clause=where_clause, limit_clause=limit_clause, ) else: return { "error": { "code": "INVALID_QUERY_TYPE", "message": f"Invalid query type '{query_type}'. Must be one of: {', '.join(INFO_SCHEMA_TEMPLATES.keys())}, custom", } } # Execute query in dry-run mode job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) query_job = client.query(query, job_config=job_config) # Get schema information from dry-run schema = [] if query_job.schema: for field in query_job.schema: schema.append( { "name": field.name, "type": field.field_type, "mode": field.mode, "description": field.description, } ) # Calculate cost estimate bytes_processed = query_job.total_bytes_processed or 0 price_per_tib = float(os.environ.get("SAFE_PRICE_PER_TIB", "5.0")) bytes_per_tib = 1024**4 estimated_cost_usd = (bytes_processed / bytes_per_tib) * price_per_tib result = { "query_type": query_type, "dataset_id": dataset_id, "project": project, "query": query.strip(), "schema": schema, "metadata": { "total_bytes_processed": bytes_processed, "estimated_cost_usd": round(estimated_cost_usd, 6), "cache_hit": False, # Always false for dry-run }, "info": "Query validated successfully. Execute without dry_run to get actual results.", } if table_filter: result["table_filter"] = table_filter return result except BadRequest as e: return { "error": { "code": "QUERY_ERROR", "message": str(e), "query": query if "query" in locals() else None, } } except Exception as e: return {"error": {"code": "INFO_SCHEMA_ERROR", "message": str(e)}} async def analyze_query_performance(sql: str, project_id: Optional[str] = None) -> dict[str, Any]: """ Analyze query performance using dry-run execution plan. Args: sql: The SQL query to analyze project_id: GCP project ID (uses default if not provided) Returns: Dict with performance analysis and optimization suggestions """ try: client = get_bigquery_client() project = project_id or client.project # Execute dry-run to get query plan job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) query_job = client.query(sql, job_config=job_config) # Extract performance metrics bytes_processed = query_job.total_bytes_processed or 0 bytes_billed = query_job.total_bytes_billed or bytes_processed # Calculate costs price_per_tib = float(os.environ.get("SAFE_PRICE_PER_TIB", "5.0")) bytes_per_tib = 1024**4 bytes_per_gib = 1024**3 estimated_cost_usd = (bytes_billed / bytes_per_tib) * price_per_tib # Analyze referenced tables referenced_tables = [] if query_job.referenced_tables: for table_ref in query_job.referenced_tables: referenced_tables.append( { "project": table_ref.project, "dataset": table_ref.dataset_id, "table": table_ref.table_id, "full_id": f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", } ) # Performance analysis performance_analysis = { "bytes_processed": bytes_processed, "bytes_billed": bytes_billed, "gigabytes_processed": round(bytes_processed / bytes_per_gib, 3), "estimated_cost_usd": round(estimated_cost_usd, 6), "slot_milliseconds": ( query_job.estimated_bytes_processed if hasattr(query_job, "estimated_bytes_processed") else None ), "referenced_tables": referenced_tables, "table_count": len(referenced_tables), } # Generate optimization suggestions suggestions = [] # Check for high data scan if bytes_processed > 100 * bytes_per_gib: # More than 100 GB suggestions.append( { "type": "HIGH_DATA_SCAN", "severity": "HIGH", "message": f"Query will process {round(bytes_processed / bytes_per_gib, 2)} GB of data", "recommendation": "Consider adding WHERE clauses, using partitioning, or limiting date ranges", } ) # Check for SELECT * if "SELECT *" in sql.upper() or "SELECT\n*" in sql.upper(): suggestions.append( { "type": "SELECT_STAR", "severity": "MEDIUM", "message": "Query uses SELECT * which processes all columns", "recommendation": "Select only the columns you need to reduce data processed", } ) # Check for missing LIMIT without ORDER BY has_limit = "LIMIT" in sql.upper() has_order_by = "ORDER BY" in sql.upper() if has_limit and not has_order_by: suggestions.append( { "type": "LIMIT_WITHOUT_ORDER", "severity": "LOW", "message": "LIMIT without ORDER BY may return inconsistent results", "recommendation": "Add ORDER BY clause to ensure consistent results", } ) # Check for CROSS JOIN if "CROSS JOIN" in sql.upper(): suggestions.append( { "type": "CROSS_JOIN", "severity": "HIGH", "message": "CROSS JOIN can produce very large result sets", "recommendation": "Verify that CROSS JOIN is necessary, consider using INNER JOIN with conditions", } ) # Check for subqueries in WHERE clause if "WHERE" in sql.upper() and "SELECT" in sql.upper()[sql.upper().index("WHERE") :]: suggestions.append( { "type": "SUBQUERY_IN_WHERE", "severity": "MEDIUM", "message": "Subquery in WHERE clause may impact performance", "recommendation": "Consider using JOIN or WITH clause instead", } ) # Check for multiple table scans if len(referenced_tables) > 5: suggestions.append( { "type": "MANY_TABLES", "severity": "MEDIUM", "message": f"Query references {len(referenced_tables)} tables", "recommendation": "Consider creating intermediate tables or materialized views for complex joins", } ) # Calculate performance score (0-100) score = 100 # Deduct points based on data volume if bytes_processed > 1 * bytes_per_tib: score -= 30 elif bytes_processed > 100 * bytes_per_gib: score -= 20 elif bytes_processed > 10 * bytes_per_gib: score -= 10 # Deduct points for issues for suggestion in suggestions: if suggestion["severity"] == "HIGH": score -= 15 elif suggestion["severity"] == "MEDIUM": score -= 10 elif suggestion["severity"] == "LOW": score -= 5 score = max(0, score) # Ensure score doesn't go below 0 # Determine performance rating if score >= 80: rating = "EXCELLENT" elif score >= 60: rating = "GOOD" elif score >= 40: rating = "FAIR" else: rating = "NEEDS_OPTIMIZATION" return { "query_analysis": performance_analysis, "performance_score": score, "performance_rating": rating, "optimization_suggestions": suggestions, "suggestion_count": len(suggestions), "estimated_execution": { "note": "Actual execution time depends on cluster resources and current load", "complexity_indicator": ( "HIGH" if bytes_processed > 100 * bytes_per_gib else "MEDIUM" if bytes_processed > 10 * bytes_per_gib else "LOW" ), }, } except BadRequest as e: return {"error": {"code": "ANALYSIS_ERROR", "message": str(e)}} except Exception as e: return {"error": {"code": "PERFORMANCE_ANALYSIS_ERROR", "message": str(e)}}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server