Skip to main content
Glama
johnoconnor0

Google Ads MCP Server

by johnoconnor0
query_optimizer.py15.3 kB
""" Google Ads Query Optimizer GAQL query optimization and validation with: - Query syntax validation - Field existence checking - Query complexity analysis - Automatic pagination - Query performance hints - Field compatibility checking """ import re import logging from typing import List, Dict, Any, Optional, Set, Tuple from enum import Enum from dataclasses import dataclass logger = logging.getLogger(__name__) class QueryComplexity(str, Enum): """Query complexity levels.""" SIMPLE = "simple" MODERATE = "moderate" COMPLEX = "complex" VERY_COMPLEX = "very_complex" @dataclass class QueryAnalysis: """Query analysis result.""" is_valid: bool complexity: QueryComplexity estimated_rows: Optional[int] warnings: List[str] suggestions: List[str] errors: List[str] field_count: int has_aggregation: bool has_segmentation: bool class GAQLValidator: """Validates Google Ads Query Language (GAQL) queries.""" # Common GAQL keywords KEYWORDS = { 'SELECT', 'FROM', 'WHERE', 'ORDER BY', 'LIMIT', 'DURING', 'BETWEEN', 'AND', 'OR', 'NOT', 'IN', 'LIKE', 'IS NULL', 'IS NOT NULL', 'DESC', 'ASC' } # Valid resources in Google Ads API VALID_RESOURCES = { 'customer', 'campaign', 'ad_group', 'ad_group_ad', 'ad_group_criterion', 'keyword_view', 'search_term_view', 'campaign_criterion', 'recommendation', 'customer_client', 'bidding_strategy', 'campaign_budget', 'ad_group_audience_view', 'audience', 'user_list', 'conversion_action', 'asset', 'campaign_asset', 'ad_group_asset', 'extension_feed_item', 'geographic_view', 'age_range_view', 'gender_view', 'landing_page_view', 'shopping_performance_view' } # Metrics fields (frequently updated, should use recent date ranges) METRICS_FIELDS = { 'metrics.clicks', 'metrics.impressions', 'metrics.cost_micros', 'metrics.conversions', 'metrics.conversions_value', 'metrics.average_cpc', 'metrics.average_cpm', 'metrics.ctr', 'metrics.conversion_rate', 'metrics.cost_per_conversion', 'metrics.all_conversions', 'metrics.interactions', 'metrics.engagement_rate', 'metrics.video_views' } @staticmethod def validate_syntax(query: str) -> Tuple[bool, List[str]]: """ Validate basic GAQL syntax. Args: query: GAQL query string Returns: Tuple of (is_valid, error_messages) """ errors = [] # Check if query is empty if not query or not query.strip(): errors.append("Query cannot be empty") return False, errors # Check for SELECT clause if not re.search(r'\bSELECT\b', query, re.IGNORECASE): errors.append("Query must contain SELECT clause") # Check for FROM clause if not re.search(r'\bFROM\b', query, re.IGNORECASE): errors.append("Query must contain FROM clause") # Check balanced parentheses if query.count('(') != query.count(')'): errors.append("Unbalanced parentheses in query") # Check for common syntax errors if re.search(r'SELECT\s+FROM', query, re.IGNORECASE): errors.append("SELECT clause cannot be empty") if re.search(r'FROM\s+(WHERE|ORDER|LIMIT)', query, re.IGNORECASE): errors.append("FROM clause must specify a resource") # Check for unterminated strings single_quotes = query.count("'") - query.count("\\'") double_quotes = query.count('"') - query.count('\\"') if single_quotes % 2 != 0: errors.append("Unterminated string (single quote)") if double_quotes % 2 != 0: errors.append("Unterminated string (double quote)") return len(errors) == 0, errors @staticmethod def extract_resource(query: str) -> Optional[str]: """ Extract resource name from query. Args: query: GAQL query Returns: Resource name or None """ match = re.search(r'\bFROM\s+(\w+)', query, re.IGNORECASE) if match: return match.group(1).lower() return None @staticmethod def extract_fields(query: str) -> List[str]: """ Extract field names from SELECT clause. Args: query: GAQL query Returns: List of field names """ # Extract SELECT clause select_match = re.search( r'\bSELECT\s+(.*?)\s+FROM\b', query, re.IGNORECASE | re.DOTALL ) if not select_match: return [] select_clause = select_match.group(1) # Split by comma (but not within parentheses) fields = [] current_field = '' paren_depth = 0 for char in select_clause: if char == '(': paren_depth += 1 elif char == ')': paren_depth -= 1 elif char == ',' and paren_depth == 0: field = current_field.strip() if field: fields.append(field) current_field = '' continue current_field += char # Add last field field = current_field.strip() if field: fields.append(field) return fields @classmethod def validate_resource(cls, resource: Optional[str]) -> Tuple[bool, Optional[str]]: """ Validate resource name. Args: resource: Resource name Returns: Tuple of (is_valid, error_message) """ if not resource: return False, "No resource specified" if resource.lower() not in cls.VALID_RESOURCES: return False, f"Invalid resource: {resource}. Must be one of: {', '.join(sorted(cls.VALID_RESOURCES))}" return True, None @classmethod def analyze_complexity(cls, query: str) -> QueryComplexity: """ Analyze query complexity. Args: query: GAQL query Returns: Complexity level """ complexity_score = 0 # Factor 1: Number of fields fields = cls.extract_fields(query) field_count = len(fields) if field_count > 20: complexity_score += 3 elif field_count > 10: complexity_score += 2 elif field_count > 5: complexity_score += 1 # Factor 2: WHERE clause complexity where_match = re.search(r'\bWHERE\b', query, re.IGNORECASE) if where_match: complexity_score += 1 # Count conditions and_count = len(re.findall(r'\bAND\b', query, re.IGNORECASE)) or_count = len(re.findall(r'\bOR\b', query, re.IGNORECASE)) condition_count = and_count + or_count if condition_count > 5: complexity_score += 2 elif condition_count > 2: complexity_score += 1 # Factor 3: Segmentation segments_match = re.search(r'\bsegments\.\w+', query, re.IGNORECASE) if segments_match: complexity_score += 1 # Factor 4: ORDER BY order_match = re.search(r'\bORDER BY\b', query, re.IGNORECASE) if order_match: complexity_score += 1 # Determine complexity level if complexity_score >= 7: return QueryComplexity.VERY_COMPLEX elif complexity_score >= 5: return QueryComplexity.COMPLEX elif complexity_score >= 3: return QueryComplexity.MODERATE else: return QueryComplexity.SIMPLE class QueryOptimizer: """Optimizes GAQL queries for performance.""" def __init__(self): """Initialize query optimizer.""" self.validator = GAQLValidator() def analyze_query(self, query: str) -> QueryAnalysis: """ Analyze query and provide optimization suggestions. Args: query: GAQL query Returns: Query analysis result """ warnings = [] suggestions = [] errors = [] # Validate syntax is_valid, syntax_errors = self.validator.validate_syntax(query) errors.extend(syntax_errors) # Extract resource resource = self.validator.extract_resource(query) # Validate resource resource_valid, resource_error = self.validator.validate_resource(resource) if not resource_valid: errors.append(resource_error) # Extract and analyze fields fields = self.validator.extract_fields(query) field_count = len(fields) # Check for metrics without date range has_metrics = any( field.startswith('metrics.') or field in self.validator.METRICS_FIELDS for field in fields ) has_date_filter = bool(re.search( r'(DURING|segments\.date)', query, re.IGNORECASE )) if has_metrics and not has_date_filter: warnings.append( "Query includes metrics but no date range. " "Consider adding 'WHERE segments.date DURING LAST_30_DAYS' for better performance." ) # Check for SELECT * if '*' in fields: warnings.append( "Using SELECT * may return unnecessary data. " "Specify only the fields you need for better performance." ) suggestions.append("List specific fields instead of using SELECT *") # Check field count if field_count > 20: warnings.append( f"Query selects {field_count} fields. " "Consider reducing to only necessary fields for better performance." ) elif field_count > 30: errors.append( f"Query selects {field_count} fields, which may exceed API limits. " "Reduce to 30 or fewer fields." ) # Check for LIMIT clause has_limit = bool(re.search(r'\bLIMIT\b', query, re.IGNORECASE)) if not has_limit: suggestions.append( "Add LIMIT clause to prevent accidentally fetching too many rows. " "Example: LIMIT 1000" ) # Check for aggregation has_aggregation = bool(re.search( r'\b(SUM|COUNT|AVG|MAX|MIN)\s*\(', query, re.IGNORECASE )) # Check for segmentation has_segmentation = bool(re.search(r'\bsegments\.\w+', query)) # Analyze complexity complexity = self.validator.analyze_complexity(query) # Complexity-based suggestions if complexity in [QueryComplexity.COMPLEX, QueryComplexity.VERY_COMPLEX]: suggestions.append( "Consider breaking this complex query into multiple simpler queries " "for better performance and maintainability." ) # Resource-specific suggestions if resource == 'search_term_view': if not has_date_filter: warnings.append( "search_term_view queries should always include a date range " "to avoid fetching excessive historical data." ) if resource == 'keyword_view': suggestions.append( "For keyword performance data, consider filtering by campaign_id " "or ad_group_id to reduce result set." ) return QueryAnalysis( is_valid=is_valid and len(errors) == 0, complexity=complexity, estimated_rows=None, # Would require historical stats warnings=warnings, suggestions=suggestions, errors=errors, field_count=field_count, has_aggregation=has_aggregation, has_segmentation=has_segmentation ) def optimize_query(self, query: str) -> str: """ Optimize query by applying best practices. Args: query: Original query Returns: Optimized query """ optimized = query # Add LIMIT if missing if not re.search(r'\bLIMIT\b', optimized, re.IGNORECASE): optimized += ' LIMIT 1000' logger.info("Added LIMIT 1000 to query") # Format query for readability optimized = self._format_query(optimized) return optimized def _format_query(self, query: str) -> str: """ Format query for readability. Args: query: Query to format Returns: Formatted query """ # This is a simple formatter - could be enhanced formatted = query.strip() # Add newlines after major clauses formatted = re.sub(r'\s+FROM\s+', '\nFROM ', formatted, flags=re.IGNORECASE) formatted = re.sub(r'\s+WHERE\s+', '\nWHERE ', formatted, flags=re.IGNORECASE) formatted = re.sub(r'\s+ORDER BY\s+', '\nORDER BY ', formatted, flags=re.IGNORECASE) formatted = re.sub(r'\s+LIMIT\s+', '\nLIMIT ', formatted, flags=re.IGNORECASE) return formatted def suggest_indexes(self, resource: str, filters: List[str]) -> List[str]: """ Suggest which fields to filter on for optimal performance. Args: resource: Resource being queried filters: Current filter fields Returns: List of suggested filter fields """ suggestions = [] # Resource-specific index suggestions index_suggestions = { 'campaign': ['campaign.id', 'campaign.status', 'segments.date'], 'ad_group': ['ad_group.id', 'campaign.id', 'ad_group.status'], 'keyword_view': ['ad_group.id', 'campaign.id', 'segments.date'], 'search_term_view': ['segments.date', 'campaign.id'], } if resource in index_suggestions: for suggested_field in index_suggestions[resource]: if suggested_field not in filters: suggestions.append( f"Consider filtering by {suggested_field} for better performance" ) return suggestions # Global optimizer instance _query_optimizer: Optional[QueryOptimizer] = None def get_query_optimizer() -> QueryOptimizer: """Get or create global query optimizer.""" global _query_optimizer if _query_optimizer is None: _query_optimizer = QueryOptimizer() return _query_optimizer def validate_query(query: str) -> QueryAnalysis: """ Validate and analyze a GAQL query. Args: query: GAQL query string Returns: Query analysis result """ optimizer = get_query_optimizer() return optimizer.analyze_query(query) def optimize_query(query: str) -> str: """ Optimize a GAQL query. Args: query: Original query Returns: Optimized query """ optimizer = get_query_optimizer() return optimizer.optimize_query(query)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnoconnor0/google-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server