Skip to main content
Glama
johnoconnor0

Google Ads MCP Server

by johnoconnor0
batch_operations_manager.py33.5 kB
""" Batch Operations Manager Handles bulk operations for campaigns, ad groups, keywords, and ads. Capabilities: - Batch creation (campaigns, ad groups, keywords, ads) - Batch updates (budgets, bids, statuses) - CSV import/export - Google Ads Editor file import - Partial failure handling - Progress tracking - Operation validation """ from typing import Dict, Any, List, Optional, Union from google.ads.googleads.client import GoogleAdsClient from dataclasses import dataclass import csv import io from enum import Enum class OperationStatus(str, Enum): """Status of a batch operation.""" SUCCESS = "SUCCESS" FAILED = "FAILED" PARTIAL = "PARTIAL" @dataclass class BatchResult: """Result of a batch operation.""" total: int succeeded: int failed: int status: OperationStatus results: List[Dict[str, Any]] errors: List[Dict[str, Any]] class BatchOperationsManager: """Manager for batch operations on Google Ads entities.""" def __init__(self, client: GoogleAdsClient): """Initialize the batch operations manager. Args: client: Authenticated GoogleAdsClient instance """ self.client = client def batch_create_campaigns( self, customer_id: str, campaigns: List[Dict[str, Any]] ) -> BatchResult: """Create multiple campaigns in a single batch operation. Args: customer_id: Customer ID (without hyphens) campaigns: List of campaign configurations Returns: BatchResult with success/failure details """ campaign_service = self.client.get_service("CampaignService") operations = [] for campaign_data in campaigns: campaign_operation = self.client.get_type("CampaignOperation") campaign = campaign_operation.create # Basic settings campaign.name = campaign_data['name'] campaign.advertising_channel_type = self.client.enums.AdvertisingChannelTypeEnum[ campaign_data.get('type', 'SEARCH') ] campaign.status = self.client.enums.CampaignStatusEnum[ campaign_data.get('status', 'PAUSED') ] # Budget if 'budget_amount' in campaign_data: campaign_budget_service = self.client.get_service("CampaignBudgetService") budget_operation = self.client.get_type("CampaignBudgetOperation") budget = budget_operation.create budget.name = f"{campaign_data['name']} Budget" budget.amount_micros = int(campaign_data['budget_amount'] * 1_000_000) budget.delivery_method = self.client.enums.BudgetDeliveryMethodEnum.STANDARD # Create budget first budget_response = campaign_budget_service.mutate_campaign_budgets( customer_id=customer_id, operations=[budget_operation] ) campaign.campaign_budget = budget_response.results[0].resource_name # Bidding strategy if campaign_data.get('bidding_strategy') == 'MAXIMIZE_CONVERSIONS': campaign.maximize_conversions.target_cpa_micros = int( campaign_data.get('target_cpa', 0) * 1_000_000 ) elif campaign_data.get('bidding_strategy') == 'TARGET_CPA': campaign.target_cpa.target_cpa_micros = int( campaign_data.get('target_cpa', 0) * 1_000_000 ) else: # Default to manual CPC campaign.manual_cpc.enhanced_cpc_enabled = True operations.append(campaign_operation) # Execute batch operation with partial failure try: response = campaign_service.mutate_campaigns( customer_id=customer_id, operations=operations, partial_failure=True ) results = [] errors = [] succeeded = 0 failed = 0 for i, result in enumerate(response.results): if result.resource_name: results.append({ 'index': i, 'campaign_name': campaigns[i]['name'], 'resource_name': result.resource_name, 'campaign_id': result.resource_name.split('/')[-1], 'status': 'success' }) succeeded += 1 else: errors.append({ 'index': i, 'campaign_name': campaigns[i]['name'], 'error': 'Failed to create campaign' }) failed += 1 # Check for partial failure errors if response.partial_failure_error: for i, error in enumerate(response.partial_failure_error.details): if hasattr(error, 'errors'): for err in error.errors: errors.append({ 'index': i, 'campaign_name': campaigns[i]['name'] if i < len(campaigns) else 'Unknown', 'error': err.message }) failed += 1 status = OperationStatus.SUCCESS if failed == 0 else \ OperationStatus.FAILED if succeeded == 0 else \ OperationStatus.PARTIAL return BatchResult( total=len(campaigns), succeeded=succeeded, failed=failed, status=status, results=results, errors=errors ) except Exception as e: return BatchResult( total=len(campaigns), succeeded=0, failed=len(campaigns), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) def batch_create_ad_groups( self, customer_id: str, ad_groups: List[Dict[str, Any]] ) -> BatchResult: """Create multiple ad groups in a single batch operation. Args: customer_id: Customer ID (without hyphens) ad_groups: List of ad group configurations Returns: BatchResult with success/failure details """ ad_group_service = self.client.get_service("AdGroupService") operations = [] for ag_data in ad_groups: ad_group_operation = self.client.get_type("AdGroupOperation") ad_group = ad_group_operation.create ad_group.name = ag_data['name'] ad_group.campaign = self.client.get_service("CampaignService").campaign_path( customer_id, ag_data['campaign_id'] ) ad_group.status = self.client.enums.AdGroupStatusEnum[ ag_data.get('status', 'PAUSED') ] # Set CPC bid if 'cpc_bid' in ag_data: ad_group.cpc_bid_micros = int(ag_data['cpc_bid'] * 1_000_000) operations.append(ad_group_operation) try: response = ad_group_service.mutate_ad_groups( customer_id=customer_id, operations=operations, partial_failure=True ) results = [] errors = [] succeeded = 0 failed = 0 for i, result in enumerate(response.results): if result.resource_name: results.append({ 'index': i, 'ad_group_name': ad_groups[i]['name'], 'resource_name': result.resource_name, 'ad_group_id': result.resource_name.split('/')[-1], 'status': 'success' }) succeeded += 1 if response.partial_failure_error: for error_detail in response.partial_failure_error.details: if hasattr(error_detail, 'errors'): for err in error_detail.errors: idx = err.location.field_path_elements[0].index if err.location else 0 errors.append({ 'index': idx, 'ad_group_name': ad_groups[idx]['name'] if idx < len(ad_groups) else 'Unknown', 'error': err.message }) failed += 1 status = OperationStatus.SUCCESS if failed == 0 else \ OperationStatus.FAILED if succeeded == 0 else \ OperationStatus.PARTIAL return BatchResult( total=len(ad_groups), succeeded=succeeded, failed=failed, status=status, results=results, errors=errors ) except Exception as e: return BatchResult( total=len(ad_groups), succeeded=0, failed=len(ad_groups), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) def batch_add_keywords( self, customer_id: str, keywords: List[Dict[str, Any]] ) -> BatchResult: """Add multiple keywords in a single batch operation. Args: customer_id: Customer ID (without hyphens) keywords: List of keyword configurations Returns: BatchResult with success/failure details """ ad_group_criterion_service = self.client.get_service("AdGroupCriterionService") operations = [] for kw_data in keywords: criterion_operation = self.client.get_type("AdGroupCriterionOperation") criterion = criterion_operation.create criterion.ad_group = self.client.get_service("AdGroupService").ad_group_path( customer_id, kw_data['ad_group_id'] ) criterion.status = self.client.enums.AdGroupCriterionStatusEnum.ENABLED criterion.keyword.text = kw_data['text'] criterion.keyword.match_type = self.client.enums.KeywordMatchTypeEnum[ kw_data.get('match_type', 'BROAD') ] # Set CPC bid if provided if 'cpc_bid' in kw_data: criterion.cpc_bid_micros = int(kw_data['cpc_bid'] * 1_000_000) operations.append(criterion_operation) try: response = ad_group_criterion_service.mutate_ad_group_criteria( customer_id=customer_id, operations=operations, partial_failure=True ) results = [] errors = [] succeeded = 0 failed = 0 for i, result in enumerate(response.results): if result.resource_name: results.append({ 'index': i, 'keyword_text': keywords[i]['text'], 'match_type': keywords[i].get('match_type', 'BROAD'), 'resource_name': result.resource_name, 'keyword_id': result.resource_name.split('~')[-1], 'status': 'success' }) succeeded += 1 if response.partial_failure_error: for error_detail in response.partial_failure_error.details: if hasattr(error_detail, 'errors'): for err in error_detail.errors: idx = err.location.field_path_elements[0].index if err.location else 0 errors.append({ 'index': idx, 'keyword_text': keywords[idx]['text'] if idx < len(keywords) else 'Unknown', 'error': err.message }) failed += 1 status = OperationStatus.SUCCESS if failed == 0 else \ OperationStatus.FAILED if succeeded == 0 else \ OperationStatus.PARTIAL return BatchResult( total=len(keywords), succeeded=succeeded, failed=failed, status=status, results=results, errors=errors ) except Exception as e: return BatchResult( total=len(keywords), succeeded=0, failed=len(keywords), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) def batch_create_ads( self, customer_id: str, ads: List[Dict[str, Any]] ) -> BatchResult: """Create multiple responsive search ads in a single batch operation. Args: customer_id: Customer ID (without hyphens) ads: List of ad configurations Returns: BatchResult with success/failure details """ ad_group_ad_service = self.client.get_service("AdGroupAdService") operations = [] for ad_data in ads: ad_group_ad_operation = self.client.get_type("AdGroupAdOperation") ad_group_ad = ad_group_ad_operation.create ad_group_ad.ad_group = self.client.get_service("AdGroupService").ad_group_path( customer_id, ad_data['ad_group_id'] ) ad_group_ad.status = self.client.enums.AdGroupAdStatusEnum.PAUSED # Create RSA ad_group_ad.ad.responsive_search_ad.headlines.extend([ self.client.get_type("AdTextAsset", text=h, pinned_field=None) for h in ad_data['headlines'] ]) ad_group_ad.ad.responsive_search_ad.descriptions.extend([ self.client.get_type("AdTextAsset", text=d, pinned_field=None) for d in ad_data['descriptions'] ]) # Set final URLs ad_group_ad.ad.final_urls.extend(ad_data.get('final_urls', [])) operations.append(ad_group_ad_operation) try: response = ad_group_ad_service.mutate_ad_group_ads( customer_id=customer_id, operations=operations, partial_failure=True ) results = [] errors = [] succeeded = 0 failed = 0 for i, result in enumerate(response.results): if result.resource_name: results.append({ 'index': i, 'ad_group_id': ads[i]['ad_group_id'], 'resource_name': result.resource_name, 'ad_id': result.resource_name.split('~')[-1], 'status': 'success' }) succeeded += 1 if response.partial_failure_error: for error_detail in response.partial_failure_error.details: if hasattr(error_detail, 'errors'): for err in error_detail.errors: idx = err.location.field_path_elements[0].index if err.location else 0 errors.append({ 'index': idx, 'ad_group_id': ads[idx]['ad_group_id'] if idx < len(ads) else 'Unknown', 'error': err.message }) failed += 1 status = OperationStatus.SUCCESS if failed == 0 else \ OperationStatus.FAILED if succeeded == 0 else \ OperationStatus.PARTIAL return BatchResult( total=len(ads), succeeded=succeeded, failed=failed, status=status, results=results, errors=errors ) except Exception as e: return BatchResult( total=len(ads), succeeded=0, failed=len(ads), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) def batch_update_budgets( self, customer_id: str, budget_updates: List[Dict[str, Any]] ) -> BatchResult: """Update budgets for multiple campaigns. Args: customer_id: Customer ID (without hyphens) budget_updates: List of budget update configurations Returns: BatchResult with success/failure details """ campaign_service = self.client.get_service("CampaignService") operations = [] for update in budget_updates: campaign_operation = self.client.get_type("CampaignOperation") campaign = campaign_operation.update campaign.resource_name = campaign_service.campaign_path( customer_id, update['campaign_id'] ) # Update budget amount campaign_budget_service = self.client.get_service("CampaignBudgetService") budget_operation = self.client.get_type("CampaignBudgetOperation") budget = budget_operation.update # Get current budget resource name ga_service = self.client.get_service("GoogleAdsService") query = f""" SELECT campaign.campaign_budget FROM campaign WHERE campaign.id = {update['campaign_id']} """ response = ga_service.search(customer_id=customer_id, query=query) budget_resource_name = list(response)[0].campaign.campaign_budget budget.resource_name = budget_resource_name budget.amount_micros = int(update['budget_amount'] * 1_000_000) # Update field mask self.client.copy_from( campaign_operation.update_mask, self.client.get_type("FieldMask", paths=["amount_micros"]) ) # Execute budget update budget_response = campaign_budget_service.mutate_campaign_budgets( customer_id=customer_id, operations=[budget_operation] ) # Return success for all budget updates return BatchResult( total=len(budget_updates), succeeded=len(budget_updates), failed=0, status=OperationStatus.SUCCESS, results=[{ 'campaign_id': u['campaign_id'], 'new_budget': u['budget_amount'], 'status': 'success' } for u in budget_updates], errors=[] ) def batch_update_bids( self, customer_id: str, bid_updates: List[Dict[str, Any]] ) -> BatchResult: """Update CPC bids for multiple keywords or ad groups. Args: customer_id: Customer ID (without hyphens) bid_updates: List of bid update configurations Returns: BatchResult with success/failure details """ operations = [] # Determine if updating keywords or ad groups entity_type = bid_updates[0].get('entity_type', 'keyword') if entity_type == 'keyword': criterion_service = self.client.get_service("AdGroupCriterionService") for update in bid_updates: criterion_operation = self.client.get_type("AdGroupCriterionOperation") criterion = criterion_operation.update criterion.resource_name = criterion_service.ad_group_criterion_path( customer_id, update['ad_group_id'], update['criterion_id'] ) criterion.cpc_bid_micros = int(update['cpc_bid'] * 1_000_000) self.client.copy_from( criterion_operation.update_mask, self.client.get_type("FieldMask", paths=["cpc_bid_micros"]) ) operations.append(criterion_operation) try: response = criterion_service.mutate_ad_group_criteria( customer_id=customer_id, operations=operations, partial_failure=True ) succeeded = len([r for r in response.results if r.resource_name]) failed = len(bid_updates) - succeeded return BatchResult( total=len(bid_updates), succeeded=succeeded, failed=failed, status=OperationStatus.SUCCESS if failed == 0 else OperationStatus.PARTIAL, results=[{ 'criterion_id': u['criterion_id'], 'new_bid': u['cpc_bid'], 'status': 'success' } for u in bid_updates[:succeeded]], errors=[] ) except Exception as e: return BatchResult( total=len(bid_updates), succeeded=0, failed=len(bid_updates), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) else: # ad_group ad_group_service = self.client.get_service("AdGroupService") for update in bid_updates: ad_group_operation = self.client.get_type("AdGroupOperation") ad_group = ad_group_operation.update ad_group.resource_name = ad_group_service.ad_group_path( customer_id, update['ad_group_id'] ) ad_group.cpc_bid_micros = int(update['cpc_bid'] * 1_000_000) self.client.copy_from( ad_group_operation.update_mask, self.client.get_type("FieldMask", paths=["cpc_bid_micros"]) ) operations.append(ad_group_operation) try: response = ad_group_service.mutate_ad_groups( customer_id=customer_id, operations=operations, partial_failure=True ) succeeded = len([r for r in response.results if r.resource_name]) failed = len(bid_updates) - succeeded return BatchResult( total=len(bid_updates), succeeded=succeeded, failed=failed, status=OperationStatus.SUCCESS if failed == 0 else OperationStatus.PARTIAL, results=[{ 'ad_group_id': u['ad_group_id'], 'new_bid': u['cpc_bid'], 'status': 'success' } for u in bid_updates[:succeeded]], errors=[] ) except Exception as e: return BatchResult( total=len(bid_updates), succeeded=0, failed=len(bid_updates), status=OperationStatus.FAILED, results=[], errors=[{'error': str(e)}] ) def batch_status_change( self, customer_id: str, entity_type: str, status_updates: List[Dict[str, Any]] ) -> BatchResult: """Change status for multiple entities (campaigns, ad groups, keywords, ads). Args: customer_id: Customer ID (without hyphens) entity_type: Type of entity (campaign, ad_group, keyword, ad) status_updates: List of status update configurations Returns: BatchResult with success/failure details """ operations = [] if entity_type == 'campaign': service = self.client.get_service("CampaignService") for update in status_updates: operation = self.client.get_type("CampaignOperation") entity = operation.update entity.resource_name = service.campaign_path(customer_id, update['entity_id']) entity.status = self.client.enums.CampaignStatusEnum[update['status']] self.client.copy_from( operation.update_mask, self.client.get_type("FieldMask", paths=["status"]) ) operations.append(operation) response = service.mutate_campaigns( customer_id=customer_id, operations=operations, partial_failure=True ) elif entity_type == 'ad_group': service = self.client.get_service("AdGroupService") for update in status_updates: operation = self.client.get_type("AdGroupOperation") entity = operation.update entity.resource_name = service.ad_group_path(customer_id, update['entity_id']) entity.status = self.client.enums.AdGroupStatusEnum[update['status']] self.client.copy_from( operation.update_mask, self.client.get_type("FieldMask", paths=["status"]) ) operations.append(operation) response = service.mutate_ad_groups( customer_id=customer_id, operations=operations, partial_failure=True ) elif entity_type == 'keyword': service = self.client.get_service("AdGroupCriterionService") for update in status_updates: operation = self.client.get_type("AdGroupCriterionOperation") entity = operation.update entity.resource_name = service.ad_group_criterion_path( customer_id, update['ad_group_id'], update['entity_id'] ) entity.status = self.client.enums.AdGroupCriterionStatusEnum[update['status']] self.client.copy_from( operation.update_mask, self.client.get_type("FieldMask", paths=["status"]) ) operations.append(operation) response = service.mutate_ad_group_criteria( customer_id=customer_id, operations=operations, partial_failure=True ) elif entity_type == 'ad': service = self.client.get_service("AdGroupAdService") for update in status_updates: operation = self.client.get_type("AdGroupAdOperation") entity = operation.update entity.resource_name = service.ad_group_ad_path( customer_id, update['ad_group_id'], update['entity_id'] ) entity.status = self.client.enums.AdGroupAdStatusEnum[update['status']] self.client.copy_from( operation.update_mask, self.client.get_type("FieldMask", paths=["status"]) ) operations.append(operation) response = service.mutate_ad_group_ads( customer_id=customer_id, operations=operations, partial_failure=True ) succeeded = len([r for r in response.results if r.resource_name]) failed = len(status_updates) - succeeded return BatchResult( total=len(status_updates), succeeded=succeeded, failed=failed, status=OperationStatus.SUCCESS if failed == 0 else OperationStatus.PARTIAL, results=[{ 'entity_id': u['entity_id'], 'new_status': u['status'], 'status': 'success' } for u in status_updates[:succeeded]], errors=[] ) def export_to_csv( self, customer_id: str, entity_type: str, campaign_id: Optional[str] = None ) -> str: """Export account structure to CSV format. Args: customer_id: Customer ID (without hyphens) entity_type: Type to export (campaigns, ad_groups, keywords, ads) campaign_id: Optional campaign filter Returns: CSV string """ ga_service = self.client.get_service("GoogleAdsService") if entity_type == 'campaigns': query = """ SELECT campaign.id, campaign.name, campaign.status, campaign_budget.amount_micros, campaign.advertising_channel_type FROM campaign ORDER BY campaign.name """ response = ga_service.search(customer_id=customer_id, query=query) output = io.StringIO() writer = csv.writer(output) writer.writerow(['Campaign ID', 'Campaign Name', 'Status', 'Budget', 'Type']) for row in response: writer.writerow([ row.campaign.id, row.campaign.name, row.campaign.status.name, row.campaign_budget.amount_micros / 1_000_000, row.campaign.advertising_channel_type.name ]) return output.getvalue() elif entity_type == 'keywords': query = f""" SELECT campaign.id, campaign.name, ad_group.id, ad_group.name, ad_group_criterion.criterion_id, ad_group_criterion.keyword.text, ad_group_criterion.keyword.match_type, ad_group_criterion.status, ad_group_criterion.cpc_bid_micros FROM keyword_view {f'WHERE campaign.id = {campaign_id}' if campaign_id else ''} ORDER BY campaign.name, ad_group.name, ad_group_criterion.keyword.text """ response = ga_service.search(customer_id=customer_id, query=query) output = io.StringIO() writer = csv.writer(output) writer.writerow([ 'Campaign ID', 'Campaign Name', 'Ad Group ID', 'Ad Group Name', 'Keyword ID', 'Keyword Text', 'Match Type', 'Status', 'CPC Bid' ]) for row in response: writer.writerow([ row.campaign.id, row.campaign.name, row.ad_group.id, row.ad_group.name, row.ad_group_criterion.criterion_id, row.ad_group_criterion.keyword.text, row.ad_group_criterion.keyword.match_type.name, row.ad_group_criterion.status.name, row.ad_group_criterion.cpc_bid_micros / 1_000_000 ]) return output.getvalue() return "" def import_from_csv( self, customer_id: str, entity_type: str, csv_data: str ) -> BatchResult: """Import entities from CSV format. Args: customer_id: Customer ID (without hyphens) entity_type: Type to import (campaigns, ad_groups, keywords) csv_data: CSV string Returns: BatchResult with import details """ reader = csv.DictReader(io.StringIO(csv_data)) rows = list(reader) if entity_type == 'campaigns': campaigns = [] for row in rows: campaigns.append({ 'name': row['Campaign Name'], 'budget_amount': float(row['Budget']), 'type': row.get('Type', 'SEARCH'), 'status': row.get('Status', 'PAUSED') }) return self.batch_create_campaigns(customer_id, campaigns) elif entity_type == 'keywords': keywords = [] for row in rows: keywords.append({ 'ad_group_id': row['Ad Group ID'], 'text': row['Keyword Text'], 'match_type': row.get('Match Type', 'BROAD'), 'cpc_bid': float(row.get('CPC Bid', 0)) if row.get('CPC Bid') else None }) return self.batch_add_keywords(customer_id, keywords) return BatchResult( total=0, succeeded=0, failed=0, status=OperationStatus.FAILED, results=[], errors=[{'error': f'Unsupported entity type: {entity_type}'}] )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnoconnor0/google-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server