Google Search Console MCP
by AminForou
Verified
from typing import Any, Dict, List, Optional
import os
import json
from datetime import datetime, timedelta
import google.auth
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# MCP
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("gsc-server")
# Path to your service account JSON or user credentials JSON
# First check if GSC_CREDENTIALS_PATH environment variable is set
# Then try looking in the script directory and current working directory as fallbacks
GSC_CREDENTIALS_PATH = os.environ.get("GSC_CREDENTIALS_PATH")
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
POSSIBLE_CREDENTIAL_PATHS = [
GSC_CREDENTIALS_PATH, # First try the environment variable if set
os.path.join(SCRIPT_DIR, "service_account_credentials.json"),
os.path.join(os.getcwd(), "service_account_credentials.json"),
# Add any other potential paths here
]
SCOPES = ["https://www.googleapis.com/auth/webmasters.readonly"]
def get_gsc_service():
"""
Returns an authorized Search Console service object.
Checks for credentials in environment variable first, then fallback locations.
"""
for cred_path in POSSIBLE_CREDENTIAL_PATHS:
if cred_path and os.path.exists(cred_path):
try:
creds = service_account.Credentials.from_service_account_file(
cred_path, scopes=SCOPES
)
return build("searchconsole", "v1", credentials=creds)
except Exception as e:
continue # Try the next path if this one fails
# If we get here, none of the paths worked
raise FileNotFoundError(
f"Credentials file not found. Please set the GSC_CREDENTIALS_PATH environment variable "
f"or place credentials file in one of these locations: "
f"{', '.join([p for p in POSSIBLE_CREDENTIAL_PATHS[1:] if p])}"
)
@mcp.tool()
async def list_properties() -> str:
"""
Retrieves and returns the user's Search Console properties.
"""
try:
service = get_gsc_service()
site_list = service.sites().list().execute()
# site_list is typically something like:
# {
# "siteEntry": [
# {"siteUrl": "...", "permissionLevel": "..."},
# ...
# ]
# }
sites = site_list.get("siteEntry", [])
if not sites:
return "No Search Console properties found."
# Format the results for easy reading
lines = []
for site in sites:
site_url = site.get("siteUrl", "Unknown")
permission = site.get("permissionLevel", "Unknown permission")
lines.append(f"- {site_url} ({permission})")
return "\n".join(lines)
except FileNotFoundError as e:
return (
"Error: Service account credentials file not found.\n\n"
"To access Google Search Console, please:\n"
"1. Create a service account in Google Cloud Console\n"
"2. Download the JSON credentials file\n"
"3. Save it as 'service_account_credentials.json' in the same directory as this script\n"
"4. Share your GSC properties with the service account email"
)
except Exception as e:
return f"Error retrieving properties: {str(e)}"
@mcp.tool()
async def get_search_analytics(site_url: str, days: int = 28, dimensions: str = "query") -> str:
"""
Get search analytics data for a specific property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
days: Number of days to look back (default: 28)
dimensions: Dimensions to group by (default: query). Options: query, page, device, country, date
You can provide multiple dimensions separated by comma (e.g., "query,page")
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build request
request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": dimension_list,
"rowLimit": 20 # Limit to top 20 results
}
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return f"No search analytics data found for {site_url} in the last {days} days."
# Format results
result_lines = [f"Search analytics for {site_url} (last {days} days):"]
result_lines.append("\n" + "-" * 80 + "\n")
# Create header based on dimensions
header = []
for dim in dimension_list:
header.append(dim.capitalize())
header.extend(["Clicks", "Impressions", "CTR", "Position"])
result_lines.append(" | ".join(header))
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
data = []
# Add dimension values
for dim_value in row.get("keys", []):
data.append(dim_value[:100]) # Increased truncation limit to 100 characters
# Add metrics
data.append(str(row.get("clicks", 0)))
data.append(str(row.get("impressions", 0)))
data.append(f"{row.get('ctr', 0) * 100:.2f}%")
data.append(f"{row.get('position', 0):.1f}")
result_lines.append(" | ".join(data))
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving search analytics: {str(e)}"
@mcp.tool()
async def get_site_details(site_url: str) -> str:
"""
Get detailed information about a specific Search Console property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
"""
try:
service = get_gsc_service()
# Get site details
site_info = service.sites().get(siteUrl=site_url).execute()
# Format the results
result_lines = [f"Site details for {site_url}:"]
result_lines.append("-" * 50)
# Add basic info
result_lines.append(f"Permission level: {site_info.get('permissionLevel', 'Unknown')}")
# Add verification info if available
if "siteVerificationInfo" in site_info:
verify_info = site_info["siteVerificationInfo"]
result_lines.append(f"Verification state: {verify_info.get('verificationState', 'Unknown')}")
if "verifiedUser" in verify_info:
result_lines.append(f"Verified by: {verify_info['verifiedUser']}")
if "verificationMethod" in verify_info:
result_lines.append(f"Verification method: {verify_info['verificationMethod']}")
# Add ownership info if available
if "ownershipInfo" in site_info:
owner_info = site_info["ownershipInfo"]
result_lines.append("\nOwnership Information:")
result_lines.append(f"Owner: {owner_info.get('owner', 'Unknown')}")
if "verificationMethod" in owner_info:
result_lines.append(f"Ownership verification: {owner_info['verificationMethod']}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving site details: {str(e)}"
@mcp.tool()
async def get_sitemaps(site_url: str) -> str:
"""
List all sitemaps for a specific Search Console property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
"""
try:
service = get_gsc_service()
# Get sitemaps list
sitemaps = service.sitemaps().list(siteUrl=site_url).execute()
if not sitemaps.get("sitemap"):
return f"No sitemaps found for {site_url}."
# Format the results
result_lines = [f"Sitemaps for {site_url}:"]
result_lines.append("-" * 80)
# Header
result_lines.append("Path | Last Downloaded | Status | Indexed URLs | Errors")
result_lines.append("-" * 80)
# Add each sitemap
for sitemap in sitemaps.get("sitemap", []):
path = sitemap.get("path", "Unknown")
last_downloaded = sitemap.get("lastDownloaded", "Never")
# Format last downloaded date if it exists
if last_downloaded != "Never":
try:
# Convert to more readable format
dt = datetime.fromisoformat(last_downloaded.replace('Z', '+00:00'))
last_downloaded = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
status = "Valid"
if "errors" in sitemap and sitemap["errors"] > 0:
status = "Has errors"
# Get counts
warnings = sitemap.get("warnings", 0)
errors = sitemap.get("errors", 0)
# Get contents if available
indexed_urls = "N/A"
if "contents" in sitemap:
for content in sitemap["contents"]:
if content.get("type") == "web":
indexed_urls = content.get("submitted", "0")
break
result_lines.append(f"{path} | {last_downloaded} | {status} | {indexed_urls} | {errors}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemaps: {str(e)}"
@mcp.tool()
async def inspect_url_enhanced(site_url: str, page_url: str) -> str:
"""
Enhanced URL inspection to check indexing status and rich results in Google.
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
page_url: The specific URL to inspect
"""
try:
service = get_gsc_service()
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
# Execute request
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
return f"No inspection data found for {page_url}."
inspection = response["inspectionResult"]
# Format the results
result_lines = [f"URL Inspection for {page_url}:"]
result_lines.append("-" * 80)
# Add inspection result link if available
if "inspectionResultLink" in inspection:
result_lines.append(f"Search Console Link: {inspection['inspectionResultLink']}")
result_lines.append("-" * 80)
# Indexing status section
index_status = inspection.get("indexStatusResult", {})
verdict = index_status.get("verdict", "UNKNOWN")
result_lines.append(f"Indexing Status: {verdict}")
# Coverage state
if "coverageState" in index_status:
result_lines.append(f"Coverage: {index_status['coverageState']}")
# Last crawl
if "lastCrawlTime" in index_status:
try:
crawl_time = datetime.fromisoformat(index_status["lastCrawlTime"].replace('Z', '+00:00'))
result_lines.append(f"Last Crawled: {crawl_time.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Crawled: {index_status['lastCrawlTime']}")
# Page fetch
if "pageFetchState" in index_status:
result_lines.append(f"Page Fetch: {index_status['pageFetchState']}")
# Robots.txt status
if "robotsTxtState" in index_status:
result_lines.append(f"Robots.txt: {index_status['robotsTxtState']}")
# Indexing state
if "indexingState" in index_status:
result_lines.append(f"Indexing State: {index_status['indexingState']}")
# Canonical information
if "googleCanonical" in index_status:
result_lines.append(f"Google Canonical: {index_status['googleCanonical']}")
if "userCanonical" in index_status and index_status.get("userCanonical") != index_status.get("googleCanonical"):
result_lines.append(f"User Canonical: {index_status['userCanonical']}")
# Crawled as
if "crawledAs" in index_status:
result_lines.append(f"Crawled As: {index_status['crawledAs']}")
# Referring URLs
if "referringUrls" in index_status and index_status["referringUrls"]:
result_lines.append("\nReferring URLs:")
for url in index_status["referringUrls"][:5]: # Limit to 5 examples
result_lines.append(f"- {url}")
if len(index_status["referringUrls"]) > 5:
result_lines.append(f"... and {len(index_status['referringUrls']) - 5} more")
# Rich results
if "richResultsResult" in inspection:
rich = inspection["richResultsResult"]
result_lines.append(f"\nRich Results: {rich.get('verdict', 'UNKNOWN')}")
if "detectedItems" in rich and rich["detectedItems"]:
result_lines.append("Detected Rich Result Types:")
for item in rich["detectedItems"]:
rich_type = item.get("richResultType", "Unknown")
result_lines.append(f"- {rich_type}")
# If there are items with names, show them
if "items" in item and item["items"]:
for i, subitem in enumerate(item["items"][:3]): # Limit to 3 examples
if "name" in subitem:
result_lines.append(f" • {subitem['name']}")
if len(item["items"]) > 3:
result_lines.append(f" • ... and {len(item['items']) - 3} more items")
# Check for issues
if "richResultsIssues" in rich and rich["richResultsIssues"]:
result_lines.append("\nRich Results Issues:")
for issue in rich["richResultsIssues"]:
severity = issue.get("severity", "Unknown")
message = issue.get("message", "Unknown issue")
result_lines.append(f"- [{severity}] {message}")
return "\n".join(result_lines)
except Exception as e:
return f"Error inspecting URL: {str(e)}"
@mcp.tool()
async def batch_url_inspection(site_url: str, urls: str) -> str:
"""
Inspect multiple URLs in batch (within API limits).
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
urls: List of URLs to inspect, one per line
"""
try:
service = get_gsc_service()
# Parse URLs
url_list = [url.strip() for url in urls.split('\n') if url.strip()]
if not url_list:
return "No URLs provided for inspection."
if len(url_list) > 10:
return f"Too many URLs provided ({len(url_list)}). Please limit to 10 URLs per batch to avoid API quota issues."
# Process each URL
results = []
for page_url in url_list:
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
try:
# Execute request with a small delay to avoid rate limits
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
results.append(f"{page_url}: No inspection data found")
continue
inspection = response["inspectionResult"]
index_status = inspection.get("indexStatusResult", {})
# Get key information
verdict = index_status.get("verdict", "UNKNOWN")
coverage = index_status.get("coverageState", "Unknown")
last_crawl = "Never"
if "lastCrawlTime" in index_status:
try:
crawl_time = datetime.fromisoformat(index_status["lastCrawlTime"].replace('Z', '+00:00'))
last_crawl = crawl_time.strftime('%Y-%m-%d')
except:
last_crawl = index_status["lastCrawlTime"]
# Check for rich results
rich_results = "None"
if "richResultsResult" in inspection:
rich = inspection["richResultsResult"]
if rich.get("verdict") == "PASS" and "detectedItems" in rich and rich["detectedItems"]:
rich_types = [item.get("richResultType", "Unknown") for item in rich["detectedItems"]]
rich_results = ", ".join(rich_types)
# Format result
results.append(f"{page_url}:\n Status: {verdict} - {coverage}\n Last Crawl: {last_crawl}\n Rich Results: {rich_results}\n")
except Exception as e:
results.append(f"{page_url}: Error - {str(e)}")
# Combine results
return f"Batch URL Inspection Results for {site_url}:\n\n" + "\n".join(results)
except Exception as e:
return f"Error performing batch inspection: {str(e)}"
@mcp.tool()
async def check_indexing_issues(site_url: str, urls: str) -> str:
"""
Check for specific indexing issues across multiple URLs.
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
urls: List of URLs to check, one per line
"""
try:
service = get_gsc_service()
# Parse URLs
url_list = [url.strip() for url in urls.split('\n') if url.strip()]
if not url_list:
return "No URLs provided for inspection."
if len(url_list) > 10:
return f"Too many URLs provided ({len(url_list)}). Please limit to 10 URLs per batch to avoid API quota issues."
# Track issues by category
issues_summary = {
"not_indexed": [],
"canonical_issues": [],
"robots_blocked": [],
"fetch_issues": [],
"indexed": []
}
# Process each URL
for page_url in url_list:
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
try:
# Execute request
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
issues_summary["not_indexed"].append(f"{page_url} - No inspection data found")
continue
inspection = response["inspectionResult"]
index_status = inspection.get("indexStatusResult", {})
# Check indexing status
verdict = index_status.get("verdict", "UNKNOWN")
coverage = index_status.get("coverageState", "Unknown")
if verdict != "PASS" or "not indexed" in coverage.lower() or "excluded" in coverage.lower():
issues_summary["not_indexed"].append(f"{page_url} - {coverage}")
else:
issues_summary["indexed"].append(page_url)
# Check canonical issues
google_canonical = index_status.get("googleCanonical", "")
user_canonical = index_status.get("userCanonical", "")
if google_canonical and user_canonical and google_canonical != user_canonical:
issues_summary["canonical_issues"].append(
f"{page_url} - Google chose: {google_canonical} instead of user-declared: {user_canonical}"
)
# Check robots.txt status
robots_state = index_status.get("robotsTxtState", "")
if robots_state == "BLOCKED":
issues_summary["robots_blocked"].append(page_url)
# Check fetch issues
fetch_state = index_status.get("pageFetchState", "")
if fetch_state != "SUCCESSFUL":
issues_summary["fetch_issues"].append(f"{page_url} - {fetch_state}")
except Exception as e:
issues_summary["not_indexed"].append(f"{page_url} - Error: {str(e)}")
# Format results
result_lines = [f"Indexing Issues Report for {site_url}:"]
result_lines.append("-" * 80)
# Summary counts
result_lines.append(f"Total URLs checked: {len(url_list)}")
result_lines.append(f"Indexed: {len(issues_summary['indexed'])}")
result_lines.append(f"Not indexed: {len(issues_summary['not_indexed'])}")
result_lines.append(f"Canonical issues: {len(issues_summary['canonical_issues'])}")
result_lines.append(f"Robots.txt blocked: {len(issues_summary['robots_blocked'])}")
result_lines.append(f"Fetch issues: {len(issues_summary['fetch_issues'])}")
result_lines.append("-" * 80)
# Detailed issues
if issues_summary["not_indexed"]:
result_lines.append("\nNot Indexed URLs:")
for issue in issues_summary["not_indexed"]:
result_lines.append(f"- {issue}")
if issues_summary["canonical_issues"]:
result_lines.append("\nCanonical Issues:")
for issue in issues_summary["canonical_issues"]:
result_lines.append(f"- {issue}")
if issues_summary["robots_blocked"]:
result_lines.append("\nRobots.txt Blocked URLs:")
for url in issues_summary["robots_blocked"]:
result_lines.append(f"- {url}")
if issues_summary["fetch_issues"]:
result_lines.append("\nFetch Issues:")
for issue in issues_summary["fetch_issues"]:
result_lines.append(f"- {issue}")
return "\n".join(result_lines)
except Exception as e:
return f"Error checking indexing issues: {str(e)}"
@mcp.tool()
async def get_performance_overview(site_url: str, days: int = 28) -> str:
"""
Get a performance overview for a specific property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
days: Number of days to look back (default: 28)
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Get total metrics
total_request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": [], # No dimensions for totals
"rowLimit": 1
}
total_response = service.searchanalytics().query(siteUrl=site_url, body=total_request).execute()
# Get by date for trend
date_request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": ["date"],
"rowLimit": days
}
date_response = service.searchanalytics().query(siteUrl=site_url, body=date_request).execute()
# Format results
result_lines = [f"Performance Overview for {site_url} (last {days} days):"]
result_lines.append("-" * 80)
# Add total metrics
if total_response.get("rows"):
row = total_response["rows"][0]
result_lines.append(f"Total Clicks: {row.get('clicks', 0):,}")
result_lines.append(f"Total Impressions: {row.get('impressions', 0):,}")
result_lines.append(f"Average CTR: {row.get('ctr', 0) * 100:.2f}%")
result_lines.append(f"Average Position: {row.get('position', 0):.1f}")
else:
result_lines.append("No data available for the selected period.")
return "\n".join(result_lines)
# Add trend data
if date_response.get("rows"):
result_lines.append("\nDaily Trend:")
result_lines.append("Date | Clicks | Impressions | CTR | Position")
result_lines.append("-" * 80)
# Sort by date
sorted_rows = sorted(date_response["rows"], key=lambda x: x["keys"][0])
for row in sorted_rows:
date_str = row["keys"][0]
# Format date from YYYY-MM-DD to MM/DD
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
date_formatted = date_obj.strftime("%m/%d")
except:
date_formatted = date_str
clicks = row.get("clicks", 0)
impressions = row.get("impressions", 0)
ctr = row.get("ctr", 0) * 100
position = row.get("position", 0)
result_lines.append(f"{date_formatted} | {clicks:.0f} | {impressions:.0f} | {ctr:.2f}% | {position:.1f}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving performance overview: {str(e)}"
@mcp.tool()
async def get_advanced_search_analytics(
site_url: str,
start_date: str = None,
end_date: str = None,
dimensions: str = "query",
search_type: str = "WEB",
row_limit: int = 1000,
start_row: int = 0,
sort_by: str = "clicks",
sort_direction: str = "descending",
filter_dimension: str = None,
filter_operator: str = "contains",
filter_expression: str = None
) -> str:
"""
Get advanced search analytics data with sorting, filtering, and pagination.
Args:
site_url: The URL of the site in Search Console (must be exact match)
start_date: Start date in YYYY-MM-DD format (defaults to 28 days ago)
end_date: End date in YYYY-MM-DD format (defaults to today)
dimensions: Dimensions to group by, comma-separated (e.g., "query,page,device")
search_type: Type of search results (WEB, IMAGE, VIDEO, NEWS, DISCOVER)
row_limit: Maximum number of rows to return (max 25000)
start_row: Starting row for pagination
sort_by: Metric to sort by (clicks, impressions, ctr, position)
sort_direction: Sort direction (ascending or descending)
filter_dimension: Dimension to filter on (query, page, country, device)
filter_operator: Filter operator (contains, equals, notContains, notEquals)
filter_expression: Filter expression value
"""
try:
service = get_gsc_service()
# Calculate date range if not provided
if not end_date:
end_date = datetime.now().date().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now().date() - timedelta(days=28)).strftime("%Y-%m-%d")
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build request
request = {
"startDate": start_date,
"endDate": end_date,
"dimensions": dimension_list,
"rowLimit": min(row_limit, 25000), # Cap at API maximum
"startRow": start_row,
"searchType": search_type.upper()
}
# Add sorting
if sort_by:
metric_map = {
"clicks": "CLICK_COUNT",
"impressions": "IMPRESSION_COUNT",
"ctr": "CTR",
"position": "POSITION"
}
if sort_by in metric_map:
request["orderBy"] = [{
"metric": metric_map[sort_by],
"direction": sort_direction.lower()
}]
# Add filtering if provided
if filter_dimension and filter_expression:
filter_group = {
"filters": [{
"dimension": filter_dimension,
"operator": filter_operator,
"expression": filter_expression
}]
}
request["dimensionFilterGroups"] = [filter_group]
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return (f"No search analytics data found for {site_url} with the specified parameters.\n\n"
f"Parameters used:\n"
f"- Date range: {start_date} to {end_date}\n"
f"- Dimensions: {dimensions}\n"
f"- Search type: {search_type}\n"
f"- Filter: {filter_dimension} {filter_operator} '{filter_expression}'" if filter_dimension else "- No filter applied")
# Format results
result_lines = [f"Search analytics for {site_url}:"]
result_lines.append(f"Date range: {start_date} to {end_date}")
result_lines.append(f"Search type: {search_type}")
if filter_dimension:
result_lines.append(f"Filter: {filter_dimension} {filter_operator} '{filter_expression}'")
result_lines.append(f"Showing rows {start_row+1} to {start_row+len(response.get('rows', []))} (sorted by {sort_by} {sort_direction})")
result_lines.append("\n" + "-" * 80 + "\n")
# Create header based on dimensions
header = []
for dim in dimension_list:
header.append(dim.capitalize())
header.extend(["Clicks", "Impressions", "CTR", "Position"])
result_lines.append(" | ".join(header))
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
data = []
# Add dimension values
for dim_value in row.get("keys", []):
data.append(dim_value[:100]) # Increased truncation limit to 100 characters
# Add metrics
data.append(str(row.get("clicks", 0)))
data.append(str(row.get("impressions", 0)))
data.append(f"{row.get('ctr', 0) * 100:.2f}%")
data.append(f"{row.get('position', 0):.1f}")
result_lines.append(" | ".join(data))
# Add pagination info if there might be more results
if len(response.get("rows", [])) == row_limit:
next_start = start_row + row_limit
result_lines.append("\nThere may be more results available. To see the next page, use:")
result_lines.append(f"start_row: {next_start}, row_limit: {row_limit}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving advanced search analytics: {str(e)}"
@mcp.tool()
async def compare_search_periods(
site_url: str,
period1_start: str,
period1_end: str,
period2_start: str,
period2_end: str,
dimensions: str = "query",
limit: int = 10
) -> str:
"""
Compare search analytics data between two time periods.
Args:
site_url: The URL of the site in Search Console (must be exact match)
period1_start: Start date for period 1 (YYYY-MM-DD)
period1_end: End date for period 1 (YYYY-MM-DD)
period2_start: Start date for period 2 (YYYY-MM-DD)
period2_end: End date for period 2 (YYYY-MM-DD)
dimensions: Dimensions to group by (default: query)
limit: Number of top results to compare (default: 10)
"""
try:
service = get_gsc_service()
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build requests for both periods
period1_request = {
"startDate": period1_start,
"endDate": period1_end,
"dimensions": dimension_list,
"rowLimit": 1000 # Get more to ensure we can match items between periods
}
period2_request = {
"startDate": period2_start,
"endDate": period2_end,
"dimensions": dimension_list,
"rowLimit": 1000
}
# Execute requests
period1_response = service.searchanalytics().query(siteUrl=site_url, body=period1_request).execute()
period2_response = service.searchanalytics().query(siteUrl=site_url, body=period2_request).execute()
period1_rows = period1_response.get("rows", [])
period2_rows = period2_response.get("rows", [])
if not period1_rows and not period2_rows:
return f"No data found for either period for {site_url}."
# Create dictionaries for easy lookup
period1_data = {tuple(row.get("keys", [])): row for row in period1_rows}
period2_data = {tuple(row.get("keys", [])): row for row in period2_rows}
# Find common keys and calculate differences
all_keys = set(period1_data.keys()) | set(period2_data.keys())
comparison_data = []
for key in all_keys:
p1_row = period1_data.get(key, {"clicks": 0, "impressions": 0, "ctr": 0, "position": 0})
p2_row = period2_data.get(key, {"clicks": 0, "impressions": 0, "ctr": 0, "position": 0})
# Calculate differences
click_diff = p2_row.get("clicks", 0) - p1_row.get("clicks", 0)
click_pct = (click_diff / p1_row.get("clicks", 1)) * 100 if p1_row.get("clicks", 0) > 0 else float('inf')
imp_diff = p2_row.get("impressions", 0) - p1_row.get("impressions", 0)
imp_pct = (imp_diff / p1_row.get("impressions", 1)) * 100 if p1_row.get("impressions", 0) > 0 else float('inf')
ctr_diff = p2_row.get("ctr", 0) - p1_row.get("ctr", 0)
pos_diff = p1_row.get("position", 0) - p2_row.get("position", 0) # Note: lower position is better
comparison_data.append({
"key": key,
"p1_clicks": p1_row.get("clicks", 0),
"p2_clicks": p2_row.get("clicks", 0),
"click_diff": click_diff,
"click_pct": click_pct,
"p1_impressions": p1_row.get("impressions", 0),
"p2_impressions": p2_row.get("impressions", 0),
"imp_diff": imp_diff,
"imp_pct": imp_pct,
"p1_ctr": p1_row.get("ctr", 0),
"p2_ctr": p2_row.get("ctr", 0),
"ctr_diff": ctr_diff,
"p1_position": p1_row.get("position", 0),
"p2_position": p2_row.get("position", 0),
"pos_diff": pos_diff
})
# Sort by absolute click difference (can change to other metrics)
comparison_data.sort(key=lambda x: abs(x["click_diff"]), reverse=True)
# Format results
result_lines = [f"Search analytics comparison for {site_url}:"]
result_lines.append(f"Period 1: {period1_start} to {period1_end}")
result_lines.append(f"Period 2: {period2_start} to {period2_end}")
result_lines.append(f"Dimension(s): {dimensions}")
result_lines.append(f"Top {min(limit, len(comparison_data))} results by change in clicks:")
result_lines.append("\n" + "-" * 100 + "\n")
# Create header
dim_header = " | ".join([d.capitalize() for d in dimension_list])
result_lines.append(f"{dim_header} | P1 Clicks | P2 Clicks | Change | % | P1 Pos | P2 Pos | Pos Δ")
result_lines.append("-" * 100)
# Add data rows (limited to requested number)
for item in comparison_data[:limit]:
key_str = " | ".join([str(k)[:100] for k in item["key"]])
# Format the click change with color indicators
click_change = item["click_diff"]
click_pct = item["click_pct"] if item["click_pct"] != float('inf') else "N/A"
click_pct_str = f"{click_pct:.1f}%" if click_pct != "N/A" else "N/A"
# Format position change (positive is good - moving up in rankings)
pos_change = item["pos_diff"]
result_lines.append(
f"{key_str} | {item['p1_clicks']} | {item['p2_clicks']} | "
f"{click_change:+d} | {click_pct_str} | "
f"{item['p1_position']:.1f} | {item['p2_position']:.1f} | {pos_change:+.1f}"
)
return "\n".join(result_lines)
except Exception as e:
return f"Error comparing search periods: {str(e)}"
@mcp.tool()
async def get_search_by_page_query(
site_url: str,
page_url: str,
days: int = 28
) -> str:
"""
Get search analytics data for a specific page, broken down by query.
Args:
site_url: The URL of the site in Search Console (must be exact match)
page_url: The specific page URL to analyze
days: Number of days to look back (default: 28)
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Build request with page filter
request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": ["query"],
"dimensionFilterGroups": [{
"filters": [{
"dimension": "page",
"operator": "equals",
"expression": page_url
}]
}],
"rowLimit": 20, # Top 20 queries for this page
"orderBy": [{"metric": "CLICK_COUNT", "direction": "descending"}]
}
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return f"No search data found for page {page_url} in the last {days} days."
# Format results
result_lines = [f"Search queries for page {page_url} (last {days} days):"]
result_lines.append("\n" + "-" * 80 + "\n")
# Create header
result_lines.append("Query | Clicks | Impressions | CTR | Position")
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
query = row.get("keys", ["Unknown"])[0]
clicks = row.get("clicks", 0)
impressions = row.get("impressions", 0)
ctr = row.get("ctr", 0) * 100
position = row.get("position", 0)
result_lines.append(f"{query[:100]} | {clicks} | {impressions} | {ctr:.2f}% | {position:.1f}")
# Add total metrics
total_clicks = sum(row.get("clicks", 0) for row in response.get("rows", []))
total_impressions = sum(row.get("impressions", 0) for row in response.get("rows", []))
avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0
result_lines.append("-" * 80)
result_lines.append(f"TOTAL | {total_clicks} | {total_impressions} | {avg_ctr:.2f}% | -")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving page query data: {str(e)}"
@mcp.tool()
async def list_sitemaps_enhanced(site_url: str, sitemap_index: str = None) -> str:
"""
List all sitemaps for a specific Search Console property with detailed information.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_index: Optional sitemap index URL to list child sitemaps
"""
try:
service = get_gsc_service()
# Get sitemaps list
if sitemap_index:
sitemaps = service.sitemaps().list(siteUrl=site_url, sitemapIndex=sitemap_index).execute()
source = f"child sitemaps from index: {sitemap_index}"
else:
sitemaps = service.sitemaps().list(siteUrl=site_url).execute()
source = "all submitted sitemaps"
if not sitemaps.get("sitemap"):
return f"No sitemaps found for {site_url}" + (f" in index {sitemap_index}" if sitemap_index else ".")
# Format the results
result_lines = [f"Sitemaps for {site_url} ({source}):"]
result_lines.append("-" * 100)
# Header
result_lines.append("Path | Last Submitted | Last Downloaded | Type | URLs | Errors | Warnings")
result_lines.append("-" * 100)
# Add each sitemap
for sitemap in sitemaps.get("sitemap", []):
path = sitemap.get("path", "Unknown")
# Format dates
last_submitted = sitemap.get("lastSubmitted", "Never")
if last_submitted != "Never":
try:
dt = datetime.fromisoformat(last_submitted.replace('Z', '+00:00'))
last_submitted = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
last_downloaded = sitemap.get("lastDownloaded", "Never")
if last_downloaded != "Never":
try:
dt = datetime.fromisoformat(last_downloaded.replace('Z', '+00:00'))
last_downloaded = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
# Determine type
sitemap_type = "Index" if sitemap.get("isSitemapsIndex", False) else "Sitemap"
# Get counts
errors = sitemap.get("errors", 0)
warnings = sitemap.get("warnings", 0)
# Get URL counts
url_count = "N/A"
if "contents" in sitemap:
for content in sitemap["contents"]:
if content.get("type") == "web":
url_count = content.get("submitted", "0")
break
result_lines.append(f"{path} | {last_submitted} | {last_downloaded} | {sitemap_type} | {url_count} | {errors} | {warnings}")
# Add processing status if available
pending_count = sum(1 for sitemap in sitemaps.get("sitemap", []) if sitemap.get("isPending", False))
if pending_count > 0:
result_lines.append(f"\nNote: {pending_count} sitemaps are still pending processing by Google.")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemaps: {str(e)}"
@mcp.tool()
async def get_sitemap_details(site_url: str, sitemap_url: str) -> str:
"""
Get detailed information about a specific sitemap.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to inspect
"""
try:
service = get_gsc_service()
# Get sitemap details
details = service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
if not details:
return f"No details found for sitemap {sitemap_url}."
# Format the results
result_lines = [f"Sitemap Details for {sitemap_url}:"]
result_lines.append("-" * 80)
# Basic info
is_index = details.get("isSitemapsIndex", False)
result_lines.append(f"Type: {'Sitemap Index' if is_index else 'Sitemap'}")
# Status
is_pending = details.get("isPending", False)
result_lines.append(f"Status: {'Pending processing' if is_pending else 'Processed'}")
# Dates
if "lastSubmitted" in details:
try:
dt = datetime.fromisoformat(details["lastSubmitted"].replace('Z', '+00:00'))
result_lines.append(f"Last Submitted: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Submitted: {details['lastSubmitted']}")
if "lastDownloaded" in details:
try:
dt = datetime.fromisoformat(details["lastDownloaded"].replace('Z', '+00:00'))
result_lines.append(f"Last Downloaded: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Downloaded: {details['lastDownloaded']}")
# Errors and warnings
result_lines.append(f"Errors: {details.get('errors', 0)}")
result_lines.append(f"Warnings: {details.get('warnings', 0)}")
# Content breakdown
if "contents" in details and details["contents"]:
result_lines.append("\nContent Breakdown:")
for content in details["contents"]:
content_type = content.get("type", "Unknown").upper()
submitted = content.get("submitted", 0)
indexed = content.get("indexed", "N/A")
result_lines.append(f"- {content_type}: {submitted} submitted, {indexed} indexed")
# If it's an index, suggest how to list child sitemaps
if is_index:
result_lines.append("\nThis is a sitemap index. To list child sitemaps, use:")
result_lines.append(f"list_sitemaps_enhanced with sitemap_index={sitemap_url}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemap details: {str(e)}"
@mcp.tool()
async def submit_sitemap(site_url: str, sitemap_url: str) -> str:
"""
Submit a new sitemap or resubmit an existing one to Google.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to submit
"""
try:
service = get_gsc_service()
# Submit the sitemap
service.sitemaps().submit(siteUrl=site_url, feedpath=sitemap_url).execute()
# Verify submission by getting details
try:
details = service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
# Format response
result_lines = [f"Successfully submitted sitemap: {sitemap_url}"]
# Add submission time if available
if "lastSubmitted" in details:
try:
dt = datetime.fromisoformat(details["lastSubmitted"].replace('Z', '+00:00'))
result_lines.append(f"Submission time: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Submission time: {details['lastSubmitted']}")
# Add processing status
is_pending = details.get("isPending", True)
result_lines.append(f"Status: {'Pending processing' if is_pending else 'Processing started'}")
# Add note about processing time
result_lines.append("\nNote: Google may take some time to process the sitemap. Check back later for full details.")
return "\n".join(result_lines)
except:
# If we can't get details, just return basic success message
return f"Successfully submitted sitemap: {sitemap_url}\n\nGoogle will queue it for processing."
except Exception as e:
return f"Error submitting sitemap: {str(e)}"
@mcp.tool()
async def delete_sitemap(site_url: str, sitemap_url: str) -> str:
"""
Delete (unsubmit) a sitemap from Google Search Console.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to delete
"""
try:
service = get_gsc_service()
# First check if the sitemap exists
try:
service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
except Exception as e:
if "404" in str(e):
return f"Sitemap not found: {sitemap_url}. It may have already been deleted or was never submitted."
else:
raise e
# Delete the sitemap
service.sitemaps().delete(siteUrl=site_url, feedpath=sitemap_url).execute()
return f"Successfully deleted sitemap: {sitemap_url}\n\nNote: This only removes the sitemap from Search Console. Any URLs already indexed will remain in Google's index."
except Exception as e:
return f"Error deleting sitemap: {str(e)}"
@mcp.tool()
async def manage_sitemaps(site_url: str, action: str, sitemap_url: str = None, sitemap_index: str = None) -> str:
"""
All-in-one tool to manage sitemaps (list, get details, submit, delete).
Args:
site_url: The URL of the site in Search Console (must be exact match)
action: The action to perform (list, details, submit, delete)
sitemap_url: The full URL of the sitemap (required for details, submit, delete)
sitemap_index: Optional sitemap index URL for listing child sitemaps (only used with 'list' action)
"""
try:
# Validate inputs
action = action.lower().strip()
valid_actions = ["list", "details", "submit", "delete"]
if action not in valid_actions:
return f"Invalid action: {action}. Please use one of: {', '.join(valid_actions)}"
if action in ["details", "submit", "delete"] and not sitemap_url:
return f"The {action} action requires a sitemap_url parameter."
# Perform the requested action
if action == "list":
return await list_sitemaps_enhanced(site_url, sitemap_index)
elif action == "details":
return await get_sitemap_details(site_url, sitemap_url)
elif action == "submit":
return await submit_sitemap(site_url, sitemap_url)
elif action == "delete":
return await delete_sitemap(site_url, sitemap_url)
except Exception as e:
return f"Error managing sitemaps: {str(e)}"
@mcp.tool()
async def get_creator_info() -> str:
"""
Provides information about Amin Foroutan, the creator of the MCP-GSC tool.
"""
creator_info = """
# About the Creator: Amin Foroutan
Amin Foroutan is an SEO consultant with over a decade of experience, specializing in technical SEO, Python-driven tools, and data analysis for SEO performance.
## Connect with Amin:
- **LinkedIn**: [Amin Foroutan](https://www.linkedin.com/in/ma-foroutan/)
- **Personal Website**: [aminforoutan.com](https://aminforoutan.com/)
- **YouTube**: [Amin Forout](https://www.youtube.com/channel/UCW7tPXg-rWdH4YzLrcAdBIw)
- **X (Twitter)**: [@aminfseo](https://x.com/aminfseo)
## Notable Projects:
Amin has created several popular SEO tools including:
- Advanced GSC Visualizer (6.4K+ users)
- SEO Render Insight Tool (3.5K+ users)
- Google AI Overview Impact Analysis (1.2K+ users)
- Google AI Overview Citation Analysis (900+ users)
- SEMRush Enhancer (570+ users)
- SEO Page Inspector (115+ users)
## Expertise:
Amin combines technical SEO knowledge with programming skills to create innovative solutions for SEO challenges.
"""
return creator_info
if __name__ == "__main__":
# Start the MCP server on stdio transport
mcp.run(transport="stdio")