Skip to main content
Glama
performance.py7.4 kB
"""Query performance analysis helpers.""" from __future__ import annotations from typing import Any from google.cloud import bigquery from google.cloud.exceptions import BadRequest from ..bigquery_client import get_bigquery_client from ..config import get_config from ..exceptions import MCPBigQueryError from ..logging_config import get_logger from ..utils import format_error_response from ..validators import AnalyzePerformanceRequest, validate_request logger = get_logger(__name__) async def analyze_query_performance( sql: str, project_id: str | None = None, ) -> dict[str, Any]: """Dry-run a query and provide performance insights.""" try: request = validate_request( AnalyzePerformanceRequest, {"sql": sql, "project_id": project_id}, ) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} try: return await _analyze_query_performance_impl(request) except MCPBigQueryError as exc: return {"error": format_error_response(exc)} except Exception as exc: # pragma: no cover - defensive guard logger.exception("Unexpected error during performance analysis") wrapped = MCPBigQueryError(str(exc), code="PERFORMANCE_ANALYSIS_ERROR") return {"error": format_error_response(wrapped)} async def _analyze_query_performance_impl(request: AnalyzePerformanceRequest) -> dict[str, Any]: client = get_bigquery_client(project_id=request.project_id) project = request.project_id or client.project job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) try: query_job = client.query(request.sql, job_config=job_config) except BadRequest as exc: raise MCPBigQueryError(str(exc), code="ANALYSIS_ERROR") from exc bytes_processed = query_job.total_bytes_processed or 0 bytes_billed = query_job.total_bytes_billed or bytes_processed price_per_tib = get_config().price_per_tib bytes_per_tib = 1024**4 bytes_per_gib = 1024**3 estimated_cost_usd = (bytes_billed / bytes_per_tib) * price_per_tib referenced_tables = [] if query_job.referenced_tables: for table_ref in query_job.referenced_tables: referenced_tables.append( { "project": table_ref.project, "dataset": table_ref.dataset_id, "table": table_ref.table_id, "full_id": f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", } ) performance_analysis = { "bytes_processed": bytes_processed, "bytes_billed": bytes_billed, "gigabytes_processed": round(bytes_processed / bytes_per_gib, 3), "estimated_cost_usd": round(estimated_cost_usd, 6), "slot_milliseconds": getattr(query_job, "estimated_bytes_processed", None), "referenced_tables": referenced_tables, "table_count": len(referenced_tables), } suggestions = _build_suggestions(request.sql, bytes_processed, referenced_tables, bytes_per_gib) score, rating = _score_performance(bytes_processed, suggestions, bytes_per_gib, bytes_per_tib) return { "query_analysis": performance_analysis, "performance_score": score, "performance_rating": rating, "optimization_suggestions": suggestions, "suggestion_count": len(suggestions), "estimated_execution": { "note": "Actual execution time depends on cluster resources and current load", "complexity_indicator": _complexity_indicator(bytes_processed, bytes_per_gib), }, "project": project, } def _build_suggestions( sql: str, bytes_processed: int, referenced_tables: list[dict[str, Any]], bytes_per_gib: int, ) -> list[dict[str, Any]]: sql_upper = sql.upper() suggestions: list[dict[str, Any]] = [] if bytes_processed > 100 * bytes_per_gib: suggestions.append( { "type": "HIGH_DATA_SCAN", "severity": "HIGH", "message": f"Query will process {round(bytes_processed / bytes_per_gib, 2)} GB of data", "recommendation": "Consider adding WHERE clauses, using partitioning, or limiting date ranges", } ) if "SELECT *" in sql_upper or "SELECT\n*" in sql_upper: suggestions.append( { "type": "SELECT_STAR", "severity": "MEDIUM", "message": "Query uses SELECT * which processes all columns", "recommendation": "Select only the columns you need to reduce data processed", } ) has_limit = "LIMIT" in sql_upper has_order_by = "ORDER BY" in sql_upper if has_limit and not has_order_by: suggestions.append( { "type": "LIMIT_WITHOUT_ORDER", "severity": "LOW", "message": "LIMIT without ORDER BY may return inconsistent results", "recommendation": "Add ORDER BY clause to ensure consistent results", } ) if "CROSS JOIN" in sql_upper: suggestions.append( { "type": "CROSS_JOIN", "severity": "HIGH", "message": "CROSS JOIN can produce very large result sets", "recommendation": "Verify that CROSS JOIN is necessary, consider using INNER JOIN with conditions", } ) if "WHERE" in sql_upper and "SELECT" in sql_upper[sql_upper.index("WHERE") :]: suggestions.append( { "type": "SUBQUERY_IN_WHERE", "severity": "MEDIUM", "message": "Subquery in WHERE clause may impact performance", "recommendation": "Consider using JOIN or WITH clause instead", } ) if len(referenced_tables) > 5: suggestions.append( { "type": "MANY_TABLES", "severity": "MEDIUM", "message": f"Query references {len(referenced_tables)} tables", "recommendation": "Consider creating intermediate tables or materialized views for complex joins", } ) return suggestions def _score_performance( bytes_processed: int, suggestions: list[dict[str, Any]], bytes_per_gib: int, bytes_per_tib: int, ) -> tuple[int, str]: score = 100 if bytes_processed > 1 * bytes_per_tib: score -= 30 elif bytes_processed > 100 * bytes_per_gib: score -= 20 elif bytes_processed > 10 * bytes_per_gib: score -= 10 for suggestion in suggestions: severity = suggestion["severity"] if severity == "HIGH": score -= 15 elif severity == "MEDIUM": score -= 10 elif severity == "LOW": score -= 5 score = max(0, score) if score >= 80: rating = "EXCELLENT" elif score >= 60: rating = "GOOD" elif score >= 40: rating = "FAIR" else: rating = "NEEDS_OPTIMIZATION" return score, rating def _complexity_indicator(bytes_processed: int, bytes_per_gib: int) -> str: if bytes_processed > 100 * bytes_per_gib: return "HIGH" if bytes_processed > 10 * bytes_per_gib: return "MEDIUM" return "LOW"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server