Skip to main content
Glama
johnoconnor0

Google Ads MCP Server

by johnoconnor0
insights_manager.py23.5 kB
""" Insights Manager Handles AI-powered performance insights, budget optimization, and competitive intelligence. Capabilities: - Performance insights and anomaly detection - Trend analysis and forecasting - Budget pacing and reallocation recommendations - Wasted spend identification - Auction insights and competitive analysis - Industry benchmarking """ from typing import Dict, Any, List, Optional from google.ads.googleads.client import GoogleAdsClient from datetime import datetime, timedelta import statistics class InsightsManager: """Manager for AI-powered insights and competitive intelligence.""" def __init__(self, client: GoogleAdsClient): """Initialize the insights manager. Args: client: Authenticated GoogleAdsClient instance """ self.client = client def get_performance_insights( self, customer_id: str, entity_type: str = "CAMPAIGN", entity_id: Optional[str] = None, date_range: str = "LAST_30_DAYS" ) -> Dict[str, Any]: """Generate AI-powered performance insights. Args: customer_id: Customer ID (without hyphens) entity_type: CAMPAIGN, AD_GROUP, KEYWORD, or AD entity_id: Optional specific entity ID date_range: Date range for analysis Returns: Performance insights with recommendations """ ga_service = self.client.get_service("GoogleAdsService") # Build query based on entity type entity_map = { "CAMPAIGN": "campaign", "AD_GROUP": "ad_group", "KEYWORD": "ad_group_criterion", "AD": "ad_group_ad" } entity = entity_map.get(entity_type.upper(), "campaign") query = f""" SELECT {entity}.id, {entity}.name, metrics.impressions, metrics.clicks, metrics.ctr, metrics.cost_micros, metrics.conversions, metrics.conversions_value, metrics.cost_per_conversion, metrics.search_impression_share, metrics.quality_score FROM {entity} WHERE segments.date DURING {date_range} """ if entity_id: query += f" AND {entity}.id = {entity_id}" query += " ORDER BY metrics.cost_micros DESC LIMIT 100" response = ga_service.search(customer_id=customer_id, query=query) insights = [] for row in response: entity_obj = getattr(row, entity) metrics = row.metrics # Calculate performance scores ctr_benchmark = 0.02 # 2% industry average cvr_benchmark = 0.05 # 5% industry average ctr = metrics.ctr cvr = metrics.conversions / metrics.clicks if metrics.clicks > 0 else 0 # Generate insights entity_insights = { 'entity_id': str(entity_obj.id), 'entity_name': entity_obj.name if hasattr(entity_obj, 'name') else 'N/A', 'metrics': { 'impressions': metrics.impressions, 'clicks': metrics.clicks, 'ctr': ctr, 'cost': metrics.cost_micros / 1_000_000, 'conversions': metrics.conversions, 'cost_per_conversion': metrics.cost_per_conversion }, 'insights': [] } # CTR insights if ctr < ctr_benchmark * 0.5: entity_insights['insights'].append({ 'type': 'LOW_CTR', 'severity': 'HIGH', 'message': f'CTR ({ctr:.2%}) is significantly below benchmark ({ctr_benchmark:.2%})', 'recommendation': 'Review ad copy and targeting. Consider testing new ad variations.' }) elif ctr > ctr_benchmark * 1.5: entity_insights['insights'].append({ 'type': 'HIGH_CTR', 'severity': 'POSITIVE', 'message': f'CTR ({ctr:.2%}) is performing well above benchmark', 'recommendation': 'Consider increasing budget to capture more traffic.' }) # Conversion rate insights if cvr < cvr_benchmark * 0.5 and metrics.clicks > 50: entity_insights['insights'].append({ 'type': 'LOW_CONVERSION_RATE', 'severity': 'HIGH', 'message': f'Conversion rate ({cvr:.2%}) is below expected level', 'recommendation': 'Review landing page experience and conversion funnel.' }) # Impression share insights if hasattr(metrics, 'search_impression_share'): is_value = metrics.search_impression_share if is_value < 0.5: entity_insights['insights'].append({ 'type': 'LOW_IMPRESSION_SHARE', 'severity': 'MEDIUM', 'message': f'Only capturing {is_value:.0%} of available impressions', 'recommendation': 'Increase budget or improve ad rank to capture more impressions.' }) # Quality score insights if hasattr(metrics, 'quality_score') and metrics.quality_score < 5: entity_insights['insights'].append({ 'type': 'LOW_QUALITY_SCORE', 'severity': 'HIGH', 'message': f'Quality Score ({metrics.quality_score}/10) needs improvement', 'recommendation': 'Improve ad relevance, expected CTR, and landing page experience.' }) if entity_insights['insights']: insights.append(entity_insights) return { 'entity_type': entity_type, 'total_analyzed': len(list(response)), 'insights_count': len(insights), 'insights': insights } def analyze_trends( self, customer_id: str, campaign_id: Optional[str] = None, lookback_days: int = 30 ) -> Dict[str, Any]: """Analyze performance trends and detect anomalies. Args: customer_id: Customer ID (without hyphens) campaign_id: Optional campaign ID filter lookback_days: Number of days to analyze Returns: Trend analysis with anomaly detection """ ga_service = self.client.get_service("GoogleAdsService") # Get daily performance data query = f""" SELECT segments.date, metrics.impressions, metrics.clicks, metrics.ctr, metrics.cost_micros, metrics.conversions FROM campaign WHERE segments.date DURING LAST_{lookback_days}_DAYS """ if campaign_id: query += f" AND campaign.id = {campaign_id}" query += " ORDER BY segments.date" response = ga_service.search(customer_id=customer_id, query=query) # Collect daily data daily_data = [] for row in response: daily_data.append({ 'date': str(row.segments.date), 'impressions': row.metrics.impressions, 'clicks': row.metrics.clicks, 'ctr': row.metrics.ctr, 'cost': row.metrics.cost_micros / 1_000_000, 'conversions': row.metrics.conversions }) if not daily_data: return {'error': 'No data available for trend analysis'} # Calculate trends costs = [d['cost'] for d in daily_data] conversions = [d['conversions'] for d in daily_data] ctrs = [d['ctr'] for d in daily_data] # Simple trend detection (comparing first half vs second half) mid_point = len(daily_data) // 2 cost_first_half = sum(costs[:mid_point]) / mid_point if mid_point > 0 else 0 cost_second_half = sum(costs[mid_point:]) / (len(costs) - mid_point) if mid_point < len(costs) else 0 conv_first_half = sum(conversions[:mid_point]) / mid_point if mid_point > 0 else 0 conv_second_half = sum(conversions[mid_point:]) / (len(conversions) - mid_point) if mid_point < len(conversions) else 0 # Detect anomalies (values beyond 2 standard deviations) anomalies = [] if len(costs) > 3: cost_mean = statistics.mean(costs) cost_stdev = statistics.stdev(costs) for i, day in enumerate(daily_data): if abs(day['cost'] - cost_mean) > 2 * cost_stdev: anomalies.append({ 'date': day['date'], 'metric': 'cost', 'value': day['cost'], 'deviation': abs(day['cost'] - cost_mean) / cost_stdev }) # Calculate trend direction cost_trend = "INCREASING" if cost_second_half > cost_first_half * 1.1 else \ "DECREASING" if cost_second_half < cost_first_half * 0.9 else "STABLE" conv_trend = "INCREASING" if conv_second_half > conv_first_half * 1.1 else \ "DECREASING" if conv_second_half < conv_first_half * 0.9 else "STABLE" return { 'lookback_days': lookback_days, 'data_points': len(daily_data), 'trends': { 'cost_trend': cost_trend, 'cost_change_pct': ((cost_second_half - cost_first_half) / cost_first_half * 100) if cost_first_half > 0 else 0, 'conversion_trend': conv_trend, 'conversion_change_pct': ((conv_second_half - conv_first_half) / conv_first_half * 100) if conv_first_half > 0 else 0 }, 'anomalies': anomalies, 'daily_data': daily_data } def get_budget_pacing( self, customer_id: str, campaign_id: str ) -> Dict[str, Any]: """Analyze budget pacing and spending velocity. Args: customer_id: Customer ID (without hyphens) campaign_id: Campaign ID Returns: Budget pacing analysis """ ga_service = self.client.get_service("GoogleAdsService") # Get campaign budget and current month spend query = f""" SELECT campaign.id, campaign.name, campaign_budget.amount_micros, campaign_budget.period, metrics.cost_micros FROM campaign WHERE campaign.id = {campaign_id} AND segments.date DURING THIS_MONTH """ response = ga_service.search(customer_id=customer_id, query=query) results = list(response) if not results: return {'error': 'Campaign not found or no data available'} row = results[0] # Calculate pacing now = datetime.now() days_in_month = (datetime(now.year, now.month + 1, 1) - timedelta(days=1)).day if now.month < 12 else 31 days_elapsed = now.day days_remaining = days_in_month - days_elapsed budget_period = row.campaign_budget.period.name if budget_period == "DAILY": daily_budget = row.campaign_budget.amount_micros / 1_000_000 monthly_budget = daily_budget * days_in_month else: monthly_budget = row.campaign_budget.amount_micros / 1_000_000 current_spend = row.metrics.cost_micros / 1_000_000 expected_spend = (monthly_budget / days_in_month) * days_elapsed pace_percentage = (current_spend / expected_spend * 100) if expected_spend > 0 else 0 projected_spend = (current_spend / days_elapsed) * days_in_month if days_elapsed > 0 else 0 # Determine pacing status if pace_percentage > 120: status = "OVERPACING" message = f"Spending {pace_percentage - 100:.0f}% faster than budget allows" elif pace_percentage < 80: status = "UNDERPACING" message = f"Spending {100 - pace_percentage:.0f}% slower than expected" else: status = "ON_TRACK" message = "Budget pacing is healthy" return { 'campaign_id': campaign_id, 'campaign_name': row.campaign.name, 'budget_period': budget_period, 'monthly_budget': monthly_budget, 'current_spend': current_spend, 'expected_spend': expected_spend, 'projected_spend': projected_spend, 'pace_percentage': pace_percentage, 'days_elapsed': days_elapsed, 'days_remaining': days_remaining, 'status': status, 'message': message } def get_budget_recommendations( self, customer_id: str, date_range: str = "LAST_30_DAYS" ) -> List[Dict[str, Any]]: """Generate budget reallocation recommendations. Args: customer_id: Customer ID (without hyphens) date_range: Date range for analysis Returns: List of budget recommendations """ ga_service = self.client.get_service("GoogleAdsService") query = f""" SELECT campaign.id, campaign.name, campaign_budget.amount_micros, metrics.cost_micros, metrics.conversions, metrics.cost_per_conversion, metrics.search_budget_lost_impression_share, metrics.conversions_value FROM campaign WHERE segments.date DURING {date_range} AND campaign.status = 'ENABLED' ORDER BY metrics.cost_micros DESC """ response = ga_service.search(customer_id=customer_id, query=query) recommendations = [] for row in response: campaign = row.campaign metrics = row.metrics budget = row.campaign_budget daily_budget = budget.amount_micros / 1_000_000 avg_daily_spend = (metrics.cost_micros / 1_000_000) / 30 # Approximate # Identify budget-constrained campaigns if metrics.search_budget_lost_impression_share > 0.2: lost_is = metrics.search_budget_lost_impression_share recommended_increase = daily_budget * (lost_is / (1 - lost_is)) recommendations.append({ 'campaign_id': str(campaign.id), 'campaign_name': campaign.name, 'type': 'INCREASE_BUDGET', 'priority': 'HIGH', 'current_budget': daily_budget, 'recommended_budget': daily_budget + recommended_increase, 'increase_amount': recommended_increase, 'reason': f'Losing {lost_is:.0%} of impressions due to budget', 'expected_impact': f'Could capture {lost_is:.0%} more impressions' }) # Identify underperforming campaigns with high spend if metrics.conversions == 0 and metrics.cost_micros > 50_000_000: # $50+ recommendations.append({ 'campaign_id': str(campaign.id), 'campaign_name': campaign.name, 'type': 'DECREASE_BUDGET', 'priority': 'MEDIUM', 'current_budget': daily_budget, 'recommended_budget': daily_budget * 0.5, 'decrease_amount': daily_budget * 0.5, 'reason': 'No conversions despite significant spend', 'expected_impact': 'Reduce wasted spend, reallocate to performing campaigns' }) # Identify high ROAS campaigns if metrics.conversions > 0: roas = metrics.conversions_value / (metrics.cost_micros / 1_000_000) if roas > 4.0 and avg_daily_spend < daily_budget * 0.8: recommendations.append({ 'campaign_id': str(campaign.id), 'campaign_name': campaign.name, 'type': 'INCREASE_BUDGET', 'priority': 'HIGH', 'current_budget': daily_budget, 'recommended_budget': daily_budget * 1.3, 'increase_amount': daily_budget * 0.3, 'reason': f'Strong ROAS ({roas:.1f}x) with budget headroom', 'expected_impact': 'Scale winning campaign for more revenue' }) return sorted(recommendations, key=lambda x: 1 if x['priority'] == 'HIGH' else 2) def analyze_wasted_spend( self, customer_id: str, date_range: str = "LAST_30_DAYS", min_cost: float = 10.0 ) -> Dict[str, Any]: """Identify sources of wasted ad spend. Args: customer_id: Customer ID (without hyphens) date_range: Date range for analysis min_cost: Minimum cost threshold for analysis Returns: Wasted spend analysis """ ga_service = self.client.get_service("GoogleAdsService") # Analyze keywords with high cost but no conversions keyword_query = f""" SELECT ad_group.id, ad_group.name, campaign.name, ad_group_criterion.keyword.text, ad_group_criterion.keyword.match_type, metrics.cost_micros, metrics.clicks, metrics.conversions FROM keyword_view WHERE segments.date DURING {date_range} AND metrics.cost_micros > {int(min_cost * 1_000_000)} ORDER BY metrics.cost_micros DESC LIMIT 50 """ keyword_response = ga_service.search(customer_id=customer_id, query=keyword_query) wasted_keywords = [] for row in keyword_response: cost = row.metrics.cost_micros / 1_000_000 if row.metrics.conversions == 0 and cost > min_cost: wasted_keywords.append({ 'keyword': row.ad_group_criterion.keyword.text, 'match_type': row.ad_group_criterion.keyword.match_type.name, 'campaign': row.campaign.name, 'ad_group': row.ad_group.name, 'cost': cost, 'clicks': row.metrics.clicks, 'conversions': row.metrics.conversions }) total_wasted = sum(k['cost'] for k in wasted_keywords) # Categorize waste types waste_categories = { 'non_converting_keywords': { 'count': len(wasted_keywords), 'cost': total_wasted, 'description': 'Keywords with spend but no conversions' } } return { 'date_range': date_range, 'total_wasted_spend': total_wasted, 'waste_categories': waste_categories, 'top_wasters': wasted_keywords[:10], 'recommendations': [ 'Add non-converting keywords as negatives', 'Review keyword match types (consider using phrase/exact instead of broad)', 'Reallocate budget to converting keywords' ] } def get_auction_insights( self, customer_id: str, campaign_id: str, date_range: str = "LAST_30_DAYS" ) -> Dict[str, Any]: """Get auction insights and competitive data. Args: customer_id: Customer ID (without hyphens) campaign_id: Campaign ID date_range: Date range Returns: Auction insights data """ ga_service = self.client.get_service("GoogleAdsService") query = f""" SELECT campaign.id, campaign.name, metrics.search_impression_share, metrics.search_rank_lost_impression_share, metrics.search_budget_lost_impression_share, metrics.search_top_impression_share, metrics.search_absolute_top_impression_share, metrics.search_exact_match_impression_share FROM campaign WHERE campaign.id = {campaign_id} AND segments.date DURING {date_range} """ response = ga_service.search(customer_id=customer_id, query=query) results = list(response) if not results: return {'error': 'No auction insights data available'} row = results[0] metrics = row.metrics # Calculate competitive position total_is = metrics.search_impression_share rank_lost = metrics.search_rank_lost_impression_share budget_lost = metrics.search_budget_lost_impression_share competitive_position = "STRONG" if total_is > 0.7 else \ "MODERATE" if total_is > 0.4 else "WEAK" # Identify primary constraint if budget_lost > rank_lost: primary_constraint = "BUDGET" constraint_message = f"Budget is limiting visibility ({budget_lost:.0%} lost)" elif rank_lost > budget_lost: primary_constraint = "AD_RANK" constraint_message = f"Ad rank is limiting visibility ({rank_lost:.0%} lost)" else: primary_constraint = "NONE" constraint_message = "No significant constraints" return { 'campaign_id': campaign_id, 'campaign_name': row.campaign.name, 'impression_share': total_is, 'top_impression_share': metrics.search_top_impression_share, 'absolute_top_impression_share': metrics.search_absolute_top_impression_share, 'rank_lost_is': rank_lost, 'budget_lost_is': budget_lost, 'competitive_position': competitive_position, 'primary_constraint': primary_constraint, 'constraint_message': constraint_message, 'recommendations': self._get_auction_recommendations( total_is, rank_lost, budget_lost, metrics.search_top_impression_share ) } def _get_auction_recommendations( self, impression_share: float, rank_lost: float, budget_lost: float, top_is: float ) -> List[str]: """Generate auction-specific recommendations.""" recommendations = [] if budget_lost > 0.15: recommendations.append("Increase daily budget to capture more impressions") if rank_lost > 0.15: recommendations.append("Improve Quality Score or increase bids to improve ad rank") if top_is < 0.3: recommendations.append("Increase bids to show in top positions more often") if impression_share < 0.5: recommendations.append("Review targeting settings - may be too restrictive") return recommendations if recommendations else ["Continue current strategy"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnoconnor0/google-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server