Skip to main content
Glama

bq_analyze_query_performance

Analyze BigQuery SQL query performance to identify optimization opportunities and improve execution efficiency.

Instructions

Analyze query performance and provide optimization suggestions

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYesThe SQL query to analyze
project_idNoGCP project ID (uses default if not provided)

Implementation Reference

  • Core handler implementation for 'bq_analyze_query_performance': performs BigQuery dry-run query, calculates costs, referenced tables, generates performance score, rating, and optimization suggestions.
    async def analyze_query_performance( sql: str, project_id: str | None = None, ) -> dict[str, Any]: """Dry-run a query and provide performance insights.""" try: request = validate_request( AnalyzePerformanceRequest, {"sql": sql, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _analyze_query_performance_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error during performance analysis") wrapped = MCPBigQueryError(str(exc), code="PERFORMANCE_ANALYSIS_ERROR") return {"error": format_error_response(wrapped)} async def _analyze_query_performance_impl(request: AnalyzePerformanceRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) try: query_job = client.query(request.sql, job_config=job_config) except BadRequest as exc: raise MCPBigQueryError(str(exc), code="ANALYSIS_ERROR") from exc bytes_processed = query_job.total_bytes_processed or 0 bytes_billed = query_job.total_bytes_billed or bytes_processed price_per_tib = get_config().price_per_tib bytes_per_tib = 1024**4 bytes_per_gib = 1024**3 estimated_cost_usd = (bytes_billed / bytes_per_tib) * price_per_tib referenced_tables = [] if query_job.referenced_tables: for table_ref in query_job.referenced_tables: referenced_tables.append( { "project": table_ref.project, "dataset": table_ref.dataset_id, "table": table_ref.table_id, "full_id": f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", } ) performance_analysis = { "bytes_processed": bytes_processed, "bytes_billed": bytes_billed, "gigabytes_processed": round(bytes_processed / bytes_per_gib, 3), "estimated_cost_usd": round(estimated_cost_usd, 6), "slot_milliseconds": getattr(query_job, "estimated_bytes_processed", None), "referenced_tables": referenced_tables, "table_count": len(referenced_tables), } suggestions = _build_suggestions(request.sql, bytes_processed, referenced_tables, bytes_per_gib) score, rating = _score_performance(bytes_processed, suggestions, bytes_per_gib, bytes_per_tib) return { "query_analysis": performance_analysis, "performance_score": score, "performance_rating": rating, "optimization_suggestions": suggestions, "suggestion_count": len(suggestions), "estimated_execution": { "note": "Actual execution time depends on cluster resources and current load", "complexity_indicator": _complexity_indicator(bytes_processed, bytes_per_gib), }, "project": project, }
  • Pydantic model defining the input schema and validation for the tool, matching the registered inputSchema.
    class AnalyzePerformanceRequest(BaseModel): """Request model for analyzing query performance.""" sql: str = Field(..., min_length=1, max_length=DEFAULT_LIMITS["max_query_length"]) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN) @field_validator("sql") @classmethod def validate_sql_not_empty(cls, v: str) -> str: """Ensure SQL is not just whitespace.""" if not v.strip(): raise ValueError("SQL query cannot be empty or only whitespace") return v
  • MCP tool registration in list_tools(), defining name, description, and JSON input schema.
    types.Tool( name="bq_analyze_query_performance", description=("Analyze query performance and provide optimization suggestions"), inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to analyze", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, }, "required": ["sql"], }, ),
  • Tool dispatch handler in call_tool() that invokes the analyze_query_performance function.
    elif name == "bq_analyze_query_performance": result = await analyze_query_performance( sql=arguments["sql"], project_id=arguments.get("project_id") ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
  • Helper function to generate specific optimization suggestions based on query patterns and dry-run results.
    def _build_suggestions( sql: str, bytes_processed: int, referenced_tables: list[dict[str, Any]], bytes_per_gib: int, ) -> list[dict[str, Any]]: sql_upper = sql.upper() suggestions: list[dict[str, Any]] = [] if bytes_processed > 100 * bytes_per_gib: suggestions.append( { "type": "HIGH_DATA_SCAN", "severity": "HIGH", "message": f"Query will process {round(bytes_processed / bytes_per_gib, 2)} GB of data", "recommendation": "Consider adding WHERE clauses, using partitioning, or limiting date ranges", } ) if "SELECT *" in sql_upper or "SELECT\n*" in sql_upper: suggestions.append( { "type": "SELECT_STAR", "severity": "MEDIUM", "message": "Query uses SELECT * which processes all columns", "recommendation": "Select only the columns you need to reduce data processed", } ) has_limit = "LIMIT" in sql_upper has_order_by = "ORDER BY" in sql_upper if has_limit and not has_order_by: suggestions.append( { "type": "LIMIT_WITHOUT_ORDER", "severity": "LOW", "message": "LIMIT without ORDER BY may return inconsistent results", "recommendation": "Add ORDER BY clause to ensure consistent results", } ) if "CROSS JOIN" in sql_upper: suggestions.append( { "type": "CROSS_JOIN", "severity": "HIGH", "message": "CROSS JOIN can produce very large result sets", "recommendation": "Verify that CROSS JOIN is necessary, consider using INNER JOIN with conditions", } ) if "WHERE" in sql_upper and "SELECT" in sql_upper[sql_upper.index("WHERE") :]: suggestions.append( { "type": "SUBQUERY_IN_WHERE", "severity": "MEDIUM", "message": "Subquery in WHERE clause may impact performance", "recommendation": "Consider using JOIN or WITH clause instead", } ) if len(referenced_tables) > 5: suggestions.append( { "type": "MANY_TABLES", "severity": "MEDIUM", "message": f"Query references {len(referenced_tables)} tables", "recommendation": "Consider creating intermediate tables or materialized views for complex joins", } ) return suggestions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server