Skip to main content
Glama
johnoconnor0

Google Ads MCP Server

by johnoconnor0
google_ads_mcp_v2.py20.6 kB
""" Google Ads MCP Server v2.0 Enhanced MCP server with: - Automatic token refresh - Intelligent caching - Error handling with retry logic - Performance tracking - Audit logging - Query optimization - Response streaming Requirements: - google-ads>=25.0.0 - mcp>=1.1.0 - httpx>=0.27.0 - pydantic>=2.0.0 - google-auth-oauthlib>=1.0.0 - PyYAML>=6.0.0 - cachetools>=5.3.0 - redis>=5.0.0 (optional) """ from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field, field_validator from typing import Optional, List, Dict, Any, Literal from enum import Enum import json import asyncio # Import our new modules from auth_manager import get_auth_manager, AuthenticationError from error_handler import with_retry, with_rate_limit_handling, ErrorHandler, safe_execute from config_manager import get_config_manager, get_config from cache_manager import get_cache_manager, cached, ResourceType, initialize_cache from logger import main_logger, performance_logger, audit_logger, get_logger from query_optimizer import get_query_optimizer, validate_query, optimize_query from response_handler import ResponseFormatter, stream_large_query, paginate_results # Initialize logger for this module logger = get_logger(__name__) # Initialize MCP server mcp = FastMCP("google_ads_mcp_v2") # Initialize configuration (loads from config.yaml or environment variables) try: config = get_config_manager() logger.info("Configuration loaded successfully") except Exception as e: logger.warning(f"Failed to load configuration, using defaults: {e}") config = get_config_manager() # Will use defaults # Initialize auth manager auth_manager = get_auth_manager() # Initialize cache manager cache_backend = config.config.performance.cache.backend if cache_backend.value == "redis" and config.config.performance.cache.redis_url: cache_manager = initialize_cache( backend=cache_backend, redis_url=config.config.performance.cache.redis_url, default_ttl=config.config.performance.cache.ttl ) else: cache_manager = initialize_cache( backend=cache_backend, max_size=config.config.performance.cache.max_size, default_ttl=config.config.performance.cache.ttl ) logger.info(f"Cache initialized with backend: {cache_backend.value}") # Constants CHARACTER_LIMIT = config.config.character_limit DEFAULT_PAGE_SIZE = config.config.default_page_size MAX_PAGE_SIZE = config.config.max_page_size # ============================================================================ # Enums # ============================================================================ class ResponseFormat(str, Enum): """Output format for tool responses.""" MARKDOWN = "markdown" JSON = "json" class DateRange(str, Enum): """Predefined date ranges for queries.""" TODAY = "TODAY" YESTERDAY = "YESTERDAY" LAST_7_DAYS = "LAST_7_DAYS" LAST_14_DAYS = "LAST_14_DAYS" LAST_30_DAYS = "LAST_30_DAYS" THIS_MONTH = "THIS_MONTH" LAST_MONTH = "LAST_MONTH" LAST_90_DAYS = "LAST_90_DAYS" class CampaignStatus(str, Enum): """Campaign status options.""" ENABLED = "ENABLED" PAUSED = "PAUSED" REMOVED = "REMOVED" # ============================================================================ # Helper Functions # ============================================================================ def format_micros(micros: int) -> float: """Convert micros to standard currency units.""" return micros / 1_000_000 def format_percentage(value: float) -> str: """Format a decimal as a percentage.""" return f"{value * 100:.2f}%" @with_retry(max_attempts=3, backoff_base=2.0) async def execute_query( customer_id: str, query: str, use_cache: bool = True ) -> List[Any]: """ Execute a GAQL query with caching and retry logic. Args: customer_id: Customer ID (without hyphens) query: GAQL query string use_cache: Whether to use caching Returns: List of row results """ with performance_logger.track_operation('execute_query', customer_id=customer_id): # Get client client = auth_manager.get_client() ga_service = client.get_service("GoogleAdsService") try: # Validate and optimize query query_analysis = validate_query(query) if not query_analysis.is_valid: error_msg = "Query validation failed:\n" + "\n".join(query_analysis.errors) logger.error(error_msg) raise ValueError(error_msg) # Log warnings for warning in query_analysis.warnings: logger.warning(f"Query warning: {warning}") # Execute query response = ga_service.search(customer_id=customer_id, query=query) results = list(response) logger.info(f"Query returned {len(results)} results for customer {customer_id}") return results except Exception as e: error_msg = ErrorHandler.handle_error(e, context="execute_query") logger.error(error_msg) raise # ============================================================================ # MCP Tools - Authentication # ============================================================================ @mcp.tool() def google_ads_initialize( developer_token: str, client_id: str, client_secret: str, refresh_token: str, login_customer_id: Optional[str] = None, client_key: str = "default" ) -> str: """ Initialize Google Ads API with OAuth credentials (v2 with auto token refresh). Args: developer_token: Google Ads developer token client_id: OAuth2 client ID client_secret: OAuth2 client secret refresh_token: OAuth2 refresh token (auto-refreshes when expired) login_customer_id: Optional MCC account ID (without hyphens) client_key: Unique identifier for this session (default: "default") Returns: Success message with session information """ try: # Initialize OAuth with auth manager session_key = auth_manager.initialize_oauth( developer_token=developer_token, client_id=client_id, client_secret=client_secret, refresh_token=refresh_token, login_customer_id=login_customer_id, client_key=client_key ) # Validate credentials is_valid = auth_manager.validate_credentials(client_key) if not is_valid: return "❌ Initialization failed: Could not validate credentials" logger.info(f"Google Ads client initialized successfully: {client_key}") return ( f"✅ Google Ads API initialized successfully!\n\n" f"Session Key: {session_key}\n" f"MCC Account: {login_customer_id or 'Not specified'}\n" f"Token Refresh: Automatic (enabled)\n" f"Multi-Account Support: Yes\n\n" f"You can now use other Google Ads tools to query and manage campaigns." ) except AuthenticationError as e: error_msg = str(e) logger.error(f"Authentication failed: {error_msg}") return f"❌ Authentication Error: {error_msg}" except Exception as e: error_msg = ErrorHandler.handle_error(e, context="google_ads_initialize") return f"❌ Initialization Error: {error_msg}" @mcp.tool() def google_ads_service_account_init( developer_token: str, json_key_file_path: str, login_customer_id: Optional[str] = None, client_key: str = "service_account" ) -> str: """ Initialize Google Ads API with service account (for automated processes). Args: developer_token: Google Ads developer token json_key_file_path: Path to service account JSON key file login_customer_id: Optional MCC account ID (without hyphens) client_key: Unique identifier for this session Returns: Success message with session information """ try: session_key = auth_manager.initialize_service_account( developer_token=developer_token, json_key_file_path=json_key_file_path, login_customer_id=login_customer_id, client_key=client_key ) logger.info(f"Service account initialized: {client_key}") return ( f"✅ Service Account initialized successfully!\n\n" f"Session Key: {session_key}\n" f"Key File: {json_key_file_path}\n" f"MCC Account: {login_customer_id or 'Not specified'}\n\n" f"Service account authentication is ideal for automated scripts and scheduled tasks." ) except AuthenticationError as e: return f"❌ Service Account Error: {str(e)}" except Exception as e: error_msg = ErrorHandler.handle_error(e, context="service_account_init") return f"❌ Initialization Error: {error_msg}" @mcp.tool() def google_ads_list_auth_sessions() -> str: """ List all authenticated Google Ads sessions. Returns: List of active sessions with metadata """ try: clients = auth_manager.list_clients() if not clients: return "No authenticated sessions found. Use google_ads_initialize first." output = "# Authenticated Google Ads Sessions\n\n" for key, info in clients.items(): current_marker = " (CURRENT)" if info['is_current'] else "" auth_type = "OAuth2" if info['has_token_manager'] else "Service Account" output += f"## {key}{current_marker}\n" output += f"- **Auth Type**: {auth_type}\n" output += f"- **Status**: Active\n\n" output += f"\n**Total Sessions**: {len(clients)}\n" output += "\nUse `google_ads_switch_session` to switch between sessions." return output except Exception as e: error_msg = ErrorHandler.handle_error(e, context="list_auth_sessions") return f"❌ Error: {error_msg}" @mcp.tool() def google_ads_switch_session(client_key: str) -> str: """ Switch to a different authenticated session. Args: client_key: Session key to switch to Returns: Success message """ try: auth_manager.switch_client(client_key) logger.info(f"Switched to session: {client_key}") return f"✅ Switched to session: {client_key}\n\nAll subsequent operations will use this session." except AuthenticationError as e: return f"❌ Error: {str(e)}" except Exception as e: error_msg = ErrorHandler.handle_error(e, context="switch_session") return f"❌ Error: {error_msg}" @mcp.tool() def google_ads_get_cache_stats() -> str: """ Get cache statistics and performance metrics. Returns: Cache statistics in markdown format """ try: stats = cache_manager.get_stats() output = "# Cache Statistics\n\n" output += f"**Backend**: {stats.get('backend', 'unknown').upper()}\n" output += f"**Hit Rate**: {stats.get('hit_rate', '0%')}\n" output += f"**Total Requests**: {stats.get('total_requests', 0)}\n" output += f"**Cache Hits**: {stats.get('hits', 0)}\n" output += f"**Cache Misses**: {stats.get('misses', 0)}\n" output += f"**Cache Sets**: {stats.get('sets', 0)}\n" output += f"**Errors**: {stats.get('errors', 0)}\n\n" if 'size' in stats: output += f"**Current Size**: {stats['size']} items\n" output += f"**Max Size**: {stats.get('max_size', 'N/A')}\n\n" if stats.get('backend') == 'redis': output += f"**Connected Clients**: {stats.get('connected_clients', 0)}\n" output += f"**Memory Used**: {stats.get('used_memory_human', 'unknown')}\n" output += f"**Total Keys**: {stats.get('total_keys', 0)}\n\n" output += "Cache is automatically used for read operations to improve performance." return output except Exception as e: error_msg = ErrorHandler.handle_error(e, context="get_cache_stats") return f"❌ Error: {error_msg}" # ============================================================================ # MCP Tools - Account Management # ============================================================================ @mcp.tool() async def google_ads_list_accounts( response_format: ResponseFormat = ResponseFormat.MARKDOWN ) -> str: """ List all accessible Google Ads accounts. Args: response_format: Output format (markdown or json) Returns: List of accounts with details """ with performance_logger.track_operation('list_accounts'): try: client = auth_manager.get_client() customer_service = client.get_service("CustomerService") # Get accessible customers accessible_customers = customer_service.list_accessible_customers() resource_names = accessible_customers.resource_names accounts = [] for resource_name in resource_names: customer_id = resource_name.split("/")[1] # Get customer details query = f""" SELECT customer.id, customer.descriptive_name, customer.currency_code, customer.time_zone, customer.manager, customer.status FROM customer WHERE customer.id = {customer_id} """ results = await execute_query(customer_id, query, use_cache=True) if results: row = results[0] accounts.append({ 'id': str(row.customer.id), 'name': row.customer.descriptive_name or 'Unnamed Account', 'currency_code': row.customer.currency_code, 'time_zone': row.customer.time_zone, 'manager': row.customer.manager, 'status': row.customer.status.name }) # Audit log audit_logger.log_api_call( customer_id="all", operation="list_accounts", resource_type="customer", action="read", result="success", details={'account_count': len(accounts)} ) # Format response if response_format == ResponseFormat.JSON: return json.dumps(accounts, indent=2) else: return format_account_list_markdown(accounts) except Exception as e: error_msg = ErrorHandler.handle_error(e, context="list_accounts") return f"❌ Error listing accounts: {error_msg}" def format_account_list_markdown(accounts: List[Dict]) -> str: """Format account list as markdown.""" if not accounts: return "No accounts found." output = "# Google Ads Accounts\n\n" for account in accounts: output += f"## {account['name']}\n" output += f"- **Customer ID**: {account['id']}\n" output += f"- **Currency**: {account['currency_code']}\n" output += f"- **Timezone**: {account['time_zone']}\n" output += f"- **Status**: {account['status']}\n" if account.get('manager'): output += f"- **Manager Account**: Yes\n" output += "\n" output += f"**Total Accounts**: {len(accounts)}\n" return output # ============================================================================ # Register Additional Tool Modules # ============================================================================ # Import and register campaign management tools try: from mcp_tools_campaigns import register_campaign_tools register_campaign_tools(mcp) logger.info("Campaign management tools registered (15 tools)") except Exception as e: logger.warning(f"Failed to register campaign tools: {e}") # Import and register ad group management tools try: from mcp_tools_ad_groups import register_ad_group_tools register_ad_group_tools(mcp) logger.info("Ad group management tools registered (8 tools)") except Exception as e: logger.warning(f"Failed to register ad group tools: {e}") # Import and register keyword management tools try: from mcp_tools_keywords import register_keyword_tools register_keyword_tools(mcp) logger.info("Keyword management tools registered (12 tools)") except Exception as e: logger.warning(f"Failed to register keyword tools: {e}") # Import and register ad management tools try: from mcp_tools_ads import register_ad_tools register_ad_tools(mcp) logger.info("Ad management tools registered (8 tools)") except Exception as e: logger.warning(f"Failed to register ad tools: {e}") # Import and register bidding strategy management tools try: from mcp_tools_bidding import register_bidding_tools register_bidding_tools(mcp) logger.info("Bidding strategy management tools registered (12 tools)") except Exception as e: logger.warning(f"Failed to register bidding tools: {e}") # Import and register automation and optimization tools try: from mcp_tools_automation import register_automation_tools register_automation_tools(mcp) logger.info("Automation and optimization tools registered (10 tools)") except Exception as e: logger.warning(f"Failed to register automation tools: {e}") # Import and register audience and remarketing tools try: from mcp_tools_audiences import register_audience_tools register_audience_tools(mcp) logger.info("Audience and remarketing tools registered (10 tools)") except Exception as e: logger.warning(f"Failed to register audience tools: {e}") # Import and register conversion tracking tools try: from mcp_tools_conversions import register_conversion_tools register_conversion_tools(mcp) logger.info("Conversion tracking tools registered (10 tools)") except Exception as e: logger.warning(f"Failed to register conversion tools: {e}") # Import and register reporting and analytics tools try: from mcp_tools_reporting import register_reporting_tools register_reporting_tools(mcp) logger.info("Reporting and analytics tools registered (6 tools)") except Exception as e: logger.warning(f"Failed to register reporting tools: {e}") # Import and register insights and recommendations tools try: from mcp_tools_insights import register_insights_tools register_insights_tools(mcp) logger.info("Insights and recommendations tools registered (8 tools)") except Exception as e: logger.warning(f"Failed to register insights tools: {e}") # Import and register batch operation tools try: from mcp_tools_batch import register_batch_tools register_batch_tools(mcp) logger.info("Batch operation tools registered (11 tools)") except Exception as e: logger.warning(f"Failed to register batch tools: {e}") # Import and register Shopping and Performance Max tools try: from mcp_tools_shopping_pmax import register_shopping_pmax_tools register_shopping_pmax_tools(mcp) logger.info("Shopping and Performance Max tools registered (9 tools)") except Exception as e: logger.warning(f"Failed to register shopping/PMax tools: {e}") # Import and register ad extension tools try: from mcp_tools_extensions import register_extension_tools register_extension_tools(mcp) logger.info("Ad extension tools registered (7 tools)") except Exception as e: logger.warning(f"Failed to register extension tools: {e}") # Import and register local/app campaign tools try: from mcp_tools_local_app import register_local_app_tools register_local_app_tools(mcp) logger.info("Local and App campaign tools registered (6 tools)") except Exception as e: logger.warning(f"Failed to register local/app tools: {e}") # ============================================================================ # Entry Point # ============================================================================ if __name__ == "__main__": logger.info("Starting Google Ads MCP Server v2.0") logger.info(f"Cache backend: {cache_backend.value}") logger.info(f"Features enabled: {config.config.features.model_dump()}") # Run the MCP server mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnoconnor0/google-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server