bq_analyze_query_performance
Analyze BigQuery SQL query performance to identify optimization opportunities and improve execution efficiency.
Instructions
Analyze query performance and provide optimization suggestions
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | The SQL query to analyze | |
| project_id | No | GCP project ID (uses default if not provided) |
Implementation Reference
- Core handler implementation for 'bq_analyze_query_performance': performs BigQuery dry-run query, calculates costs, referenced tables, generates performance score, rating, and optimization suggestions.async def analyze_query_performance( sql: str, project_id: str | None = None, ) -> dict[str, Any]: """Dry-run a query and provide performance insights.""" try: request = validate_request( AnalyzePerformanceRequest, {"sql": sql, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _analyze_query_performance_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error during performance analysis") wrapped = MCPBigQueryError(str(exc), code="PERFORMANCE_ANALYSIS_ERROR") return {"error": format_error_response(wrapped)} async def _analyze_query_performance_impl(request: AnalyzePerformanceRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) try: query_job = client.query(request.sql, job_config=job_config) except BadRequest as exc: raise MCPBigQueryError(str(exc), code="ANALYSIS_ERROR") from exc bytes_processed = query_job.total_bytes_processed or 0 bytes_billed = query_job.total_bytes_billed or bytes_processed price_per_tib = get_config().price_per_tib bytes_per_tib = 1024**4 bytes_per_gib = 1024**3 estimated_cost_usd = (bytes_billed / bytes_per_tib) * price_per_tib referenced_tables = [] if query_job.referenced_tables: for table_ref in query_job.referenced_tables: referenced_tables.append( { "project": table_ref.project, "dataset": table_ref.dataset_id, "table": table_ref.table_id, "full_id": f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", } ) performance_analysis = { "bytes_processed": bytes_processed, "bytes_billed": bytes_billed, "gigabytes_processed": round(bytes_processed / bytes_per_gib, 3), "estimated_cost_usd": round(estimated_cost_usd, 6), "slot_milliseconds": getattr(query_job, "estimated_bytes_processed", None), "referenced_tables": referenced_tables, "table_count": len(referenced_tables), } suggestions = _build_suggestions(request.sql, bytes_processed, referenced_tables, bytes_per_gib) score, rating = _score_performance(bytes_processed, suggestions, bytes_per_gib, bytes_per_tib) return { "query_analysis": performance_analysis, "performance_score": score, "performance_rating": rating, "optimization_suggestions": suggestions, "suggestion_count": len(suggestions), "estimated_execution": { "note": "Actual execution time depends on cluster resources and current load", "complexity_indicator": _complexity_indicator(bytes_processed, bytes_per_gib), }, "project": project, }
- Pydantic model defining the input schema and validation for the tool, matching the registered inputSchema.class AnalyzePerformanceRequest(BaseModel): """Request model for analyzing query performance.""" sql: str = Field(..., min_length=1, max_length=DEFAULT_LIMITS["max_query_length"]) project_id: str | None = Field(None, pattern=PROJECT_ID_PATTERN) @field_validator("sql") @classmethod def validate_sql_not_empty(cls, v: str) -> str: """Ensure SQL is not just whitespace.""" if not v.strip(): raise ValueError("SQL query cannot be empty or only whitespace") return v
- src/mcp_bigquery/server.py:301-318 (registration)MCP tool registration in list_tools(), defining name, description, and JSON input schema.types.Tool( name="bq_analyze_query_performance", description=("Analyze query performance and provide optimization suggestions"), inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to analyze", }, "project_id": { "type": "string", "description": "GCP project ID (uses default if not provided)", }, }, "required": ["sql"], }, ),
- src/mcp_bigquery/server.py:397-401 (handler)Tool dispatch handler in call_tool() that invokes the analyze_query_performance function.elif name == "bq_analyze_query_performance": result = await analyze_query_performance( sql=arguments["sql"], project_id=arguments.get("project_id") ) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
- Helper function to generate specific optimization suggestions based on query patterns and dry-run results.def _build_suggestions( sql: str, bytes_processed: int, referenced_tables: list[dict[str, Any]], bytes_per_gib: int, ) -> list[dict[str, Any]]: sql_upper = sql.upper() suggestions: list[dict[str, Any]] = [] if bytes_processed > 100 * bytes_per_gib: suggestions.append( { "type": "HIGH_DATA_SCAN", "severity": "HIGH", "message": f"Query will process {round(bytes_processed / bytes_per_gib, 2)} GB of data", "recommendation": "Consider adding WHERE clauses, using partitioning, or limiting date ranges", } ) if "SELECT *" in sql_upper or "SELECT\n*" in sql_upper: suggestions.append( { "type": "SELECT_STAR", "severity": "MEDIUM", "message": "Query uses SELECT * which processes all columns", "recommendation": "Select only the columns you need to reduce data processed", } ) has_limit = "LIMIT" in sql_upper has_order_by = "ORDER BY" in sql_upper if has_limit and not has_order_by: suggestions.append( { "type": "LIMIT_WITHOUT_ORDER", "severity": "LOW", "message": "LIMIT without ORDER BY may return inconsistent results", "recommendation": "Add ORDER BY clause to ensure consistent results", } ) if "CROSS JOIN" in sql_upper: suggestions.append( { "type": "CROSS_JOIN", "severity": "HIGH", "message": "CROSS JOIN can produce very large result sets", "recommendation": "Verify that CROSS JOIN is necessary, consider using INNER JOIN with conditions", } ) if "WHERE" in sql_upper and "SELECT" in sql_upper[sql_upper.index("WHERE") :]: suggestions.append( { "type": "SUBQUERY_IN_WHERE", "severity": "MEDIUM", "message": "Subquery in WHERE clause may impact performance", "recommendation": "Consider using JOIN or WITH clause instead", } ) if len(referenced_tables) > 5: suggestions.append( { "type": "MANY_TABLES", "severity": "MEDIUM", "message": f"Query references {len(referenced_tables)} tables", "recommendation": "Consider creating intermediate tables or materialized views for complex joins", } ) return suggestions