"""Reporting tools."""
from typing import Any
from mcp.server import Server
from ..google_ads_client import ads_client
def register_report_tools(server: Server) -> None:
"""Register reporting tools with the MCP server.
Args:
server: The MCP server instance.
"""
@server.tool()
async def get_campaign_report(
campaign_id: str | None = None,
date_range: str = "LAST_30_DAYS",
) -> dict[str, Any]:
"""Get campaign performance report.
Args:
campaign_id: Optional specific campaign ID (all campaigns if not specified).
date_range: Date range for the report. Options:
TODAY, YESTERDAY, LAST_7_DAYS, LAST_30_DAYS, THIS_MONTH, LAST_MONTH
Returns:
Dictionary containing campaign performance metrics.
"""
query = f"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions,
metrics.conversions_value,
metrics.ctr,
metrics.average_cpc
FROM campaign
WHERE segments.date DURING {date_range}
"""
if campaign_id:
query += f" AND campaign.id = {campaign_id}"
query += " ORDER BY metrics.cost_micros DESC"
try:
results = ads_client.search(query)
campaigns = []
for row in results:
campaigns.append({
"campaign_id": str(row.campaign.id),
"campaign_name": row.campaign.name,
"status": row.campaign.status.name,
"impressions": row.metrics.impressions,
"clicks": row.metrics.clicks,
"cost": row.metrics.cost_micros / 1_000_000, # Convert to currency
"conversions": row.metrics.conversions,
"conversion_value": row.metrics.conversions_value,
"ctr": row.metrics.ctr,
"avg_cpc": row.metrics.average_cpc / 1_000_000, # Convert to currency
})
return {
"date_range": date_range,
"campaigns": campaigns,
"count": len(campaigns),
}
except Exception as e:
return {"error": str(e), "campaigns": []}
@server.tool()
async def get_keyword_report(
campaign_id: str | None = None,
ad_group_id: str | None = None,
date_range: str = "LAST_30_DAYS",
limit: int = 100,
) -> dict[str, Any]:
"""Get keyword performance report.
Args:
campaign_id: Optional campaign ID to filter by.
ad_group_id: Optional ad group ID to filter by.
date_range: Date range for the report.
limit: Maximum number of keywords to return.
Returns:
Dictionary containing keyword performance metrics.
"""
query = f"""
SELECT
ad_group_criterion.keyword.text,
ad_group_criterion.keyword.match_type,
campaign.name,
ad_group.name,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions,
metrics.ctr,
metrics.average_cpc
FROM keyword_view
WHERE segments.date DURING {date_range}
"""
if campaign_id:
query += f" AND campaign.id = {campaign_id}"
if ad_group_id:
query += f" AND ad_group.id = {ad_group_id}"
query += f" ORDER BY metrics.cost_micros DESC LIMIT {limit}"
try:
results = ads_client.search(query)
keywords = []
for row in results:
keywords.append({
"keyword": row.ad_group_criterion.keyword.text,
"match_type": row.ad_group_criterion.keyword.match_type.name,
"campaign_name": row.campaign.name,
"ad_group_name": row.ad_group.name,
"impressions": row.metrics.impressions,
"clicks": row.metrics.clicks,
"cost": row.metrics.cost_micros / 1_000_000,
"conversions": row.metrics.conversions,
"ctr": row.metrics.ctr,
"avg_cpc": row.metrics.average_cpc / 1_000_000,
})
return {
"date_range": date_range,
"keywords": keywords,
"count": len(keywords),
}
except Exception as e:
return {"error": str(e), "keywords": []}
@server.tool()
async def get_search_terms_report(
campaign_id: str | None = None,
date_range: str = "LAST_30_DAYS",
limit: int = 100,
) -> dict[str, Any]:
"""Get search terms report showing actual search queries.
Args:
campaign_id: Optional campaign ID to filter by.
date_range: Date range for the report.
limit: Maximum number of search terms to return.
Returns:
Dictionary containing search term performance data.
"""
query = f"""
SELECT
search_term_view.search_term,
campaign.name,
ad_group.name,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions
FROM search_term_view
WHERE segments.date DURING {date_range}
"""
if campaign_id:
query += f" AND campaign.id = {campaign_id}"
query += f" ORDER BY metrics.impressions DESC LIMIT {limit}"
try:
results = ads_client.search(query)
search_terms = []
for row in results:
search_terms.append({
"search_term": row.search_term_view.search_term,
"campaign_name": row.campaign.name,
"ad_group_name": row.ad_group.name,
"impressions": row.metrics.impressions,
"clicks": row.metrics.clicks,
"cost": row.metrics.cost_micros / 1_000_000,
"conversions": row.metrics.conversions,
})
return {
"date_range": date_range,
"search_terms": search_terms,
"count": len(search_terms),
}
except Exception as e:
return {"error": str(e), "search_terms": []}
@server.tool()
async def get_performance_metrics(
date_range: str = "LAST_30_DAYS",
) -> dict[str, Any]:
"""Get overall account performance metrics summary.
Args:
date_range: Date range for the report.
Returns:
Dictionary containing aggregated account metrics.
"""
query = f"""
SELECT
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions,
metrics.conversions_value,
metrics.ctr,
metrics.average_cpc,
metrics.cost_per_conversion
FROM customer
WHERE segments.date DURING {date_range}
"""
try:
results = ads_client.search(query)
# Aggregate metrics
total_impressions = 0
total_clicks = 0
total_cost_micros = 0
total_conversions = 0
total_conversion_value = 0
for row in results:
total_impressions += row.metrics.impressions
total_clicks += row.metrics.clicks
total_cost_micros += row.metrics.cost_micros
total_conversions += row.metrics.conversions
total_conversion_value += row.metrics.conversions_value
# Calculate derived metrics
ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0
avg_cpc = (total_cost_micros / total_clicks) if total_clicks > 0 else 0
cost_per_conversion = (total_cost_micros / total_conversions) if total_conversions > 0 else 0
return {
"date_range": date_range,
"metrics": {
"impressions": total_impressions,
"clicks": total_clicks,
"cost": total_cost_micros / 1_000_000,
"conversions": total_conversions,
"conversion_value": total_conversion_value,
"ctr": round(ctr, 2),
"avg_cpc": round(avg_cpc / 1_000_000, 2),
"cost_per_conversion": round(cost_per_conversion / 1_000_000, 2),
},
}
except Exception as e:
return {"error": str(e), "metrics": {}}