Skip to main content
Glama
johnoconnor0

Google Ads MCP Server

by johnoconnor0
response_handler.py13 kB
""" Google Ads Response Handler Response processing with: - Automatic pagination for large datasets - Streaming support for memory efficiency - Progress indicators - Response formatting (markdown/JSON) - Data transformation """ import asyncio import logging from typing import AsyncIterator, List, Dict, Any, Optional, Callable from dataclasses import dataclass from google.ads.googleads.client import GoogleAdsClient logger = logging.getLogger(__name__) @dataclass class PaginationConfig: """Pagination configuration.""" page_size: int = 1000 max_pages: Optional[int] = None max_total_results: Optional[int] = None @dataclass class StreamProgress: """Progress information for streaming operations.""" current_page: int total_fetched: int has_more: bool estimated_total: Optional[int] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'current_page': self.current_page, 'total_fetched': self.total_fetched, 'has_more': self.has_more, 'estimated_total': self.estimated_total, 'progress_pct': self._calculate_progress() } def _calculate_progress(self) -> Optional[float]: """Calculate progress percentage.""" if self.estimated_total and self.estimated_total > 0: return min(100.0, (self.total_fetched / self.estimated_total) * 100) return None class ResponseStream: """ Handles streaming of large response datasets. """ def __init__( self, client: GoogleAdsClient, customer_id: str, query: str, page_size: int = 1000, max_results: Optional[int] = None ): """ Initialize response stream. Args: client: Google Ads client customer_id: Customer ID query: GAQL query page_size: Number of results per page max_results: Maximum total results to fetch """ self.client = client self.customer_id = customer_id self.query = query self.page_size = page_size self.max_results = max_results self.current_page = 0 self.total_fetched = 0 async def stream( self, transform_fn: Optional[Callable[[Any], Dict[str, Any]]] = None, progress_callback: Optional[Callable[[StreamProgress], None]] = None ) -> AsyncIterator[Dict[str, Any]]: """ Stream query results page by page. Args: transform_fn: Optional function to transform each row progress_callback: Optional callback for progress updates Yields: Transformed result rows """ ga_service = self.client.get_service("GoogleAdsService") try: # Execute query with streaming stream = ga_service.search_stream( customer_id=self.customer_id, query=self.query ) for batch in stream: self.current_page += 1 # Process each result in the batch for row in batch.results: # Check max results limit if self.max_results and self.total_fetched >= self.max_results: logger.info(f"Reached max results limit: {self.max_results}") return # Transform row if function provided if transform_fn: result = transform_fn(row) else: result = self._default_transform(row) self.total_fetched += 1 # Yield result yield result # Report progress if progress_callback: progress = StreamProgress( current_page=self.current_page, total_fetched=self.total_fetched, has_more=True # We don't know until stream ends ) progress_callback(progress) # Log progress if self.total_fetched % 1000 == 0: logger.info(f"Streamed {self.total_fetched} results across {self.current_page} pages") # Final progress update if progress_callback: final_progress = StreamProgress( current_page=self.current_page, total_fetched=self.total_fetched, has_more=False ) progress_callback(final_progress) logger.info( f"Stream complete: {self.total_fetched} total results " f"across {self.current_page} pages" ) except Exception as e: logger.error(f"Error streaming results: {e}") raise def _default_transform(self, row: Any) -> Dict[str, Any]: """ Default transformation (converts protobuf to dict). Args: row: Result row Returns: Dictionary representation """ # This is a placeholder - actual implementation would # properly convert protobuf objects to dictionaries return {"raw": str(row)} async def collect_all( self, transform_fn: Optional[Callable[[Any], Dict[str, Any]]] = None ) -> List[Dict[str, Any]]: """ Collect all streamed results into a list. Args: transform_fn: Optional transform function Returns: List of all results Warning: Use with caution for large datasets """ results = [] async for result in self.stream(transform_fn=transform_fn): results.append(result) return results class ResponseFormatter: """Formats API responses for different output types.""" @staticmethod def to_markdown( data: List[Dict[str, Any]], title: str = "Results", columns: Optional[List[str]] = None ) -> str: """ Format data as markdown table. Args: data: List of result dictionaries title: Title for the output columns: Optional list of columns to include (None = all) Returns: Markdown formatted string """ if not data: return f"# {title}\n\nNo results found." output = [f"# {title}\n"] # Determine columns if columns is None: # Get all unique keys from data all_keys = set() for row in data: all_keys.update(row.keys()) columns = sorted(all_keys) # Create table header header = "| " + " | ".join(columns) + " |" separator = "| " + " | ".join(["---"] * len(columns)) + " |" output.append(header) output.append(separator) # Add rows for row in data: values = [str(row.get(col, "")) for col in columns] output.append("| " + " | ".join(values) + " |") # Add summary output.append(f"\n**Total Results:** {len(data)}") return "\n".join(output) @staticmethod def to_summary( data: List[Dict[str, Any]], title: str = "Summary", metric_fields: Optional[List[str]] = None ) -> str: """ Format data as summary statistics. Args: data: List of result dictionaries title: Title for the summary metric_fields: Fields to calculate statistics for Returns: Markdown formatted summary """ if not data: return f"# {title}\n\nNo data available." output = [f"# {title}\n"] # Count output.append(f"**Total Records:** {len(data)}\n") # Calculate statistics for metric fields if metric_fields: output.append("## Metrics\n") for field in metric_fields: values = [ row.get(field, 0) for row in data if field in row and isinstance(row[field], (int, float)) ] if values: total = sum(values) avg = total / len(values) if values else 0 max_val = max(values) min_val = min(values) output.append(f"### {field}") output.append(f"- **Total:** {total:,.2f}") output.append(f"- **Average:** {avg:,.2f}") output.append(f"- **Max:** {max_val:,.2f}") output.append(f"- **Min:** {min_val:,.2f}") output.append("") return "\n".join(output) @staticmethod def truncate( text: str, max_length: int = 25000, truncate_message: str = "\n\n... (Response truncated. Use filters to reduce data size.)" ) -> str: """ Truncate response if it exceeds max length. Args: text: Text to truncate max_length: Maximum length truncate_message: Message to append if truncated Returns: Truncated text """ if len(text) <= max_length: return text truncate_at = max_length - len(truncate_message) return text[:truncate_at] + truncate_message class PaginatedResponse: """Handles paginated responses.""" def __init__( self, data: List[Dict[str, Any]], page: int = 1, page_size: int = 50, total: Optional[int] = None ): """ Initialize paginated response. Args: data: All data page: Current page number (1-indexed) page_size: Items per page total: Total number of items (if known) """ self.data = data self.page = page self.page_size = page_size self.total = total or len(data) @property def total_pages(self) -> int: """Calculate total pages.""" return (self.total + self.page_size - 1) // self.page_size @property def has_previous(self) -> bool: """Check if there's a previous page.""" return self.page > 1 @property def has_next(self) -> bool: """Check if there's a next page.""" return self.page < self.total_pages def get_page(self, page: Optional[int] = None) -> List[Dict[str, Any]]: """ Get specific page of data. Args: page: Page number (uses current if None) Returns: Page data """ page_num = page or self.page if page_num < 1 or page_num > self.total_pages: return [] start_idx = (page_num - 1) * self.page_size end_idx = start_idx + self.page_size return self.data[start_idx:end_idx] def to_dict(self, include_data: bool = True) -> Dict[str, Any]: """ Convert to dictionary. Args: include_data: Whether to include data in output Returns: Dictionary representation """ result = { 'page': self.page, 'page_size': self.page_size, 'total': self.total, 'total_pages': self.total_pages, 'has_previous': self.has_previous, 'has_next': self.has_next } if include_data: result['data'] = self.get_page() return result async def stream_large_query( client: GoogleAdsClient, customer_id: str, query: str, page_size: int = 1000, max_results: Optional[int] = None, transform_fn: Optional[Callable] = None ) -> AsyncIterator[Dict[str, Any]]: """ Stream results from a large query. Args: client: Google Ads client customer_id: Customer ID query: GAQL query page_size: Results per page max_results: Maximum total results transform_fn: Optional transform function Yields: Result rows Example: async for result in stream_large_query(client, customer_id, query): process(result) """ stream = ResponseStream( client=client, customer_id=customer_id, query=query, page_size=page_size, max_results=max_results ) async for result in stream.stream(transform_fn=transform_fn): yield result def paginate_results( data: List[Dict[str, Any]], page: int = 1, page_size: int = 50 ) -> PaginatedResponse: """ Create paginated response from data. Args: data: Full dataset page: Page number page_size: Items per page Returns: Paginated response """ return PaginatedResponse( data=data, page=page, page_size=page_size )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnoconnor0/google-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server