gsc_server.py•60 kB
from typing import Any, Dict, List, Optional
import os
import json
from datetime import datetime, timedelta
import google.auth
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# MCP
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("gsc-server")
# Path to your service account JSON or user credentials JSON
# First check if GSC_CREDENTIALS_PATH environment variable is set
# Then try looking in the script directory and current working directory as fallbacks
GSC_CREDENTIALS_PATH = os.environ.get("GSC_CREDENTIALS_PATH")
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
POSSIBLE_CREDENTIAL_PATHS = [
GSC_CREDENTIALS_PATH, # First try the environment variable if set
os.path.join(SCRIPT_DIR, "service_account_credentials.json"),
os.path.join(os.getcwd(), "service_account_credentials.json"),
# Add any other potential paths here
]
# OAuth client secrets file path
OAUTH_CLIENT_SECRETS_FILE = os.environ.get("GSC_OAUTH_CLIENT_SECRETS_FILE")
if not OAUTH_CLIENT_SECRETS_FILE:
OAUTH_CLIENT_SECRETS_FILE = os.path.join(SCRIPT_DIR, "client_secrets.json")
# Token file path for storing OAuth tokens
TOKEN_FILE = os.path.join(SCRIPT_DIR, "token.json")
# Environment variable to skip OAuth authentication
SKIP_OAUTH = os.environ.get("GSC_SKIP_OAUTH", "").lower() in ("true", "1", "yes")
SCOPES = ["https://www.googleapis.com/auth/webmasters"]
def get_gsc_service():
"""
Returns an authorized Search Console service object.
First tries OAuth authentication, then falls back to service account.
"""
# Try OAuth authentication first if not skipped
if not SKIP_OAUTH:
try:
return get_gsc_service_oauth()
except Exception as e:
# If OAuth fails, try service account
pass
# Try service account authentication
for cred_path in POSSIBLE_CREDENTIAL_PATHS:
if cred_path and os.path.exists(cred_path):
try:
creds = service_account.Credentials.from_service_account_file(
cred_path, scopes=SCOPES
)
return build("searchconsole", "v1", credentials=creds)
except Exception as e:
continue # Try the next path if this one fails
# If we get here, none of the authentication methods worked
raise FileNotFoundError(
f"Authentication failed. Please either:\n"
f"1. Set up OAuth by placing a client_secrets.json file in the script directory, or\n"
f"2. Set the GSC_CREDENTIALS_PATH environment variable or place a service account credentials file in one of these locations: "
f"{', '.join([p for p in POSSIBLE_CREDENTIAL_PATHS[1:] if p])}"
)
def get_gsc_service_oauth():
"""
Returns an authorized Search Console service object using OAuth.
"""
creds = None
# Check if token file exists
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
# If credentials don't exist or are invalid, get new ones
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
# Check if client secrets file exists
if not os.path.exists(OAUTH_CLIENT_SECRETS_FILE):
raise FileNotFoundError(
f"OAuth client secrets file not found. Please place a client_secrets.json file in the script directory "
f"or set the GSC_OAUTH_CLIENT_SECRETS_FILE environment variable."
)
# Start OAuth flow
flow = InstalledAppFlow.from_client_secrets_file(OAUTH_CLIENT_SECRETS_FILE, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for future use
with open(TOKEN_FILE, 'w') as token:
token.write(creds.to_json())
# Build and return the service
return build("searchconsole", "v1", credentials=creds)
@mcp.tool()
async def list_properties() -> str:
"""
Retrieves and returns the user's Search Console properties.
"""
try:
service = get_gsc_service()
site_list = service.sites().list().execute()
# site_list is typically something like:
# {
# "siteEntry": [
# {"siteUrl": "...", "permissionLevel": "..."},
# ...
# ]
# }
sites = site_list.get("siteEntry", [])
if not sites:
return "No Search Console properties found."
# Format the results for easy reading
lines = []
for site in sites:
site_url = site.get("siteUrl", "Unknown")
permission = site.get("permissionLevel", "Unknown permission")
lines.append(f"- {site_url} ({permission})")
return "\n".join(lines)
except FileNotFoundError as e:
return (
"Error: Service account credentials file not found.\n\n"
"To access Google Search Console, please:\n"
"1. Create a service account in Google Cloud Console\n"
"2. Download the JSON credentials file\n"
"3. Save it as 'service_account_credentials.json' in the same directory as this script\n"
"4. Share your GSC properties with the service account email"
)
except Exception as e:
return f"Error retrieving properties: {str(e)}"
@mcp.tool()
async def add_site(site_url: str) -> str:
"""
Add a site to your Search Console properties.
Args:
site_url: The URL of the site to add (must be exact match e.g. https://example.com, or https://www.example.com, or https://subdomain.example.com/path/, for domain properties use format: sc-domain:example.com)
"""
try:
service = get_gsc_service()
# Add the site
response = service.sites().add(siteUrl=site_url).execute()
# Format the response
result_lines = [f"Site {site_url} has been added to Search Console."]
# Add permission level if available
if "permissionLevel" in response:
result_lines.append(f"Permission level: {response['permissionLevel']}")
return "\n".join(result_lines)
except HttpError as e:
error_content = json.loads(e.content.decode('utf-8'))
error_details = error_content.get('error', {})
error_code = e.resp.status
error_message = error_details.get('message', str(e))
error_reason = error_details.get('errors', [{}])[0].get('reason', '')
if error_code == 409:
return f"Site {site_url} is already added to Search Console."
elif error_code == 403:
if error_reason == 'forbidden':
return f"Error: You don't have permission to add this site. Please verify ownership first."
elif error_reason == 'quotaExceeded':
return f"Error: API quota exceeded. Please try again later."
else:
return f"Error: Permission denied. {error_message}"
elif error_code == 400:
if error_reason == 'invalidParameter':
return f"Error: Invalid site URL format. Please check the URL format and try again."
else:
return f"Error: Bad request. {error_message}"
elif error_code == 401:
return f"Error: Unauthorized. Please check your credentials."
elif error_code == 429:
return f"Error: Too many requests. Please try again later."
elif error_code == 500:
return f"Error: Internal server error from Google Search Console API. Please try again later."
elif error_code == 503:
return f"Error: Service unavailable. Google Search Console API is currently down. Please try again later."
else:
return f"Error adding site (HTTP {error_code}): {error_message}"
except Exception as e:
return f"Error adding site: {str(e)}"
@mcp.tool()
async def delete_site(site_url: str) -> str:
"""
Remove a site from your Search Console properties.
Args:
site_url: The URL of the site to remove (must be exact match e.g. https://example.com, or https://www.example.com, or https://subdomain.example.com/path/, for domain properties use format: sc-domain:example.com)
"""
try:
service = get_gsc_service()
# Delete the site
service.sites().delete(siteUrl=site_url).execute()
return f"Site {site_url} has been removed from Search Console."
except HttpError as e:
error_content = json.loads(e.content.decode('utf-8'))
error_details = error_content.get('error', {})
error_code = e.resp.status
error_message = error_details.get('message', str(e))
error_reason = error_details.get('errors', [{}])[0].get('reason', '')
if error_code == 404:
return f"Site {site_url} was not found in Search Console."
elif error_code == 403:
if error_reason == 'forbidden':
return f"Error: You don't have permission to remove this site."
elif error_reason == 'quotaExceeded':
return f"Error: API quota exceeded. Please try again later."
else:
return f"Error: Permission denied. {error_message}"
elif error_code == 400:
if error_reason == 'invalidParameter':
return f"Error: Invalid site URL format. Please check the URL format and try again."
else:
return f"Error: Bad request. {error_message}"
elif error_code == 401:
return f"Error: Unauthorized. Please check your credentials."
elif error_code == 429:
return f"Error: Too many requests. Please try again later."
elif error_code == 500:
return f"Error: Internal server error from Google Search Console API. Please try again later."
elif error_code == 503:
return f"Error: Service unavailable. Google Search Console API is currently down. Please try again later."
else:
return f"Error removing site (HTTP {error_code}): {error_message}"
except Exception as e:
return f"Error removing site: {str(e)}"
@mcp.tool()
async def get_search_analytics(site_url: str, days: int = 28, dimensions: str = "query") -> str:
"""
Get search analytics data for a specific property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
days: Number of days to look back (default: 28)
dimensions: Dimensions to group by (default: query). Options: query, page, device, country, date
You can provide multiple dimensions separated by comma (e.g., "query,page")
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build request
request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": dimension_list,
"rowLimit": 20 # Limit to top 20 results
}
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return f"No search analytics data found for {site_url} in the last {days} days."
# Format results
result_lines = [f"Search analytics for {site_url} (last {days} days):"]
result_lines.append("\n" + "-" * 80 + "\n")
# Create header based on dimensions
header = []
for dim in dimension_list:
header.append(dim.capitalize())
header.extend(["Clicks", "Impressions", "CTR", "Position"])
result_lines.append(" | ".join(header))
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
data = []
# Add dimension values
for dim_value in row.get("keys", []):
data.append(dim_value[:100]) # Increased truncation limit to 100 characters
# Add metrics
data.append(str(row.get("clicks", 0)))
data.append(str(row.get("impressions", 0)))
data.append(f"{row.get('ctr', 0) * 100:.2f}%")
data.append(f"{row.get('position', 0):.1f}")
result_lines.append(" | ".join(data))
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving search analytics: {str(e)}"
@mcp.tool()
async def get_site_details(site_url: str) -> str:
"""
Get detailed information about a specific Search Console property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
"""
try:
service = get_gsc_service()
# Get site details
site_info = service.sites().get(siteUrl=site_url).execute()
# Format the results
result_lines = [f"Site details for {site_url}:"]
result_lines.append("-" * 50)
# Add basic info
result_lines.append(f"Permission level: {site_info.get('permissionLevel', 'Unknown')}")
# Add verification info if available
if "siteVerificationInfo" in site_info:
verify_info = site_info["siteVerificationInfo"]
result_lines.append(f"Verification state: {verify_info.get('verificationState', 'Unknown')}")
if "verifiedUser" in verify_info:
result_lines.append(f"Verified by: {verify_info['verifiedUser']}")
if "verificationMethod" in verify_info:
result_lines.append(f"Verification method: {verify_info['verificationMethod']}")
# Add ownership info if available
if "ownershipInfo" in site_info:
owner_info = site_info["ownershipInfo"]
result_lines.append("\nOwnership Information:")
result_lines.append(f"Owner: {owner_info.get('owner', 'Unknown')}")
if "verificationMethod" in owner_info:
result_lines.append(f"Ownership verification: {owner_info['verificationMethod']}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving site details: {str(e)}"
@mcp.tool()
async def get_sitemaps(site_url: str) -> str:
"""
List all sitemaps for a specific Search Console property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
"""
try:
service = get_gsc_service()
# Get sitemaps list
sitemaps = service.sitemaps().list(siteUrl=site_url).execute()
if not sitemaps.get("sitemap"):
return f"No sitemaps found for {site_url}."
# Format the results
result_lines = [f"Sitemaps for {site_url}:"]
result_lines.append("-" * 80)
# Header
result_lines.append("Path | Last Downloaded | Status | Indexed URLs | Errors")
result_lines.append("-" * 80)
# Add each sitemap
for sitemap in sitemaps.get("sitemap", []):
path = sitemap.get("path", "Unknown")
last_downloaded = sitemap.get("lastDownloaded", "Never")
# Format last downloaded date if it exists
if last_downloaded != "Never":
try:
# Convert to more readable format
dt = datetime.fromisoformat(last_downloaded.replace('Z', '+00:00'))
last_downloaded = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
status = "Valid"
if "errors" in sitemap and sitemap["errors"] > 0:
status = "Has errors"
# Get counts
warnings = sitemap.get("warnings", 0)
errors = sitemap.get("errors", 0)
# Get contents if available
indexed_urls = "N/A"
if "contents" in sitemap:
for content in sitemap["contents"]:
if content.get("type") == "web":
indexed_urls = content.get("submitted", "0")
break
result_lines.append(f"{path} | {last_downloaded} | {status} | {indexed_urls} | {errors}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemaps: {str(e)}"
@mcp.tool()
async def inspect_url_enhanced(site_url: str, page_url: str) -> str:
"""
Enhanced URL inspection to check indexing status and rich results in Google.
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
page_url: The specific URL to inspect
"""
try:
service = get_gsc_service()
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
# Execute request
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
return f"No inspection data found for {page_url}."
inspection = response["inspectionResult"]
# Format the results
result_lines = [f"URL Inspection for {page_url}:"]
result_lines.append("-" * 80)
# Add inspection result link if available
if "inspectionResultLink" in inspection:
result_lines.append(f"Search Console Link: {inspection['inspectionResultLink']}")
result_lines.append("-" * 80)
# Indexing status section
index_status = inspection.get("indexStatusResult", {})
verdict = index_status.get("verdict", "UNKNOWN")
result_lines.append(f"Indexing Status: {verdict}")
# Coverage state
if "coverageState" in index_status:
result_lines.append(f"Coverage: {index_status['coverageState']}")
# Last crawl
if "lastCrawlTime" in index_status:
try:
crawl_time = datetime.fromisoformat(index_status["lastCrawlTime"].replace('Z', '+00:00'))
result_lines.append(f"Last Crawled: {crawl_time.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Crawled: {index_status['lastCrawlTime']}")
# Page fetch
if "pageFetchState" in index_status:
result_lines.append(f"Page Fetch: {index_status['pageFetchState']}")
# Robots.txt status
if "robotsTxtState" in index_status:
result_lines.append(f"Robots.txt: {index_status['robotsTxtState']}")
# Indexing state
if "indexingState" in index_status:
result_lines.append(f"Indexing State: {index_status['indexingState']}")
# Canonical information
if "googleCanonical" in index_status:
result_lines.append(f"Google Canonical: {index_status['googleCanonical']}")
if "userCanonical" in index_status and index_status.get("userCanonical") != index_status.get("googleCanonical"):
result_lines.append(f"User Canonical: {index_status['userCanonical']}")
# Crawled as
if "crawledAs" in index_status:
result_lines.append(f"Crawled As: {index_status['crawledAs']}")
# Referring URLs
if "referringUrls" in index_status and index_status["referringUrls"]:
result_lines.append("\nReferring URLs:")
for url in index_status["referringUrls"][:5]: # Limit to 5 examples
result_lines.append(f"- {url}")
if len(index_status["referringUrls"]) > 5:
result_lines.append(f"... and {len(index_status['referringUrls']) - 5} more")
# Rich results
if "richResultsResult" in inspection:
rich = inspection["richResultsResult"]
result_lines.append(f"\nRich Results: {rich.get('verdict', 'UNKNOWN')}")
if "detectedItems" in rich and rich["detectedItems"]:
result_lines.append("Detected Rich Result Types:")
for item in rich["detectedItems"]:
rich_type = item.get("richResultType", "Unknown")
result_lines.append(f"- {rich_type}")
# If there are items with names, show them
if "items" in item and item["items"]:
for i, subitem in enumerate(item["items"][:3]): # Limit to 3 examples
if "name" in subitem:
result_lines.append(f" • {subitem['name']}")
if len(item["items"]) > 3:
result_lines.append(f" • ... and {len(item['items']) - 3} more items")
# Check for issues
if "richResultsIssues" in rich and rich["richResultsIssues"]:
result_lines.append("\nRich Results Issues:")
for issue in rich["richResultsIssues"]:
severity = issue.get("severity", "Unknown")
message = issue.get("message", "Unknown issue")
result_lines.append(f"- [{severity}] {message}")
return "\n".join(result_lines)
except Exception as e:
return f"Error inspecting URL: {str(e)}"
@mcp.tool()
async def batch_url_inspection(site_url: str, urls: str) -> str:
"""
Inspect multiple URLs in batch (within API limits).
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
urls: List of URLs to inspect, one per line
"""
try:
service = get_gsc_service()
# Parse URLs
url_list = [url.strip() for url in urls.split('\n') if url.strip()]
if not url_list:
return "No URLs provided for inspection."
if len(url_list) > 10:
return f"Too many URLs provided ({len(url_list)}). Please limit to 10 URLs per batch to avoid API quota issues."
# Process each URL
results = []
for page_url in url_list:
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
try:
# Execute request with a small delay to avoid rate limits
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
results.append(f"{page_url}: No inspection data found")
continue
inspection = response["inspectionResult"]
index_status = inspection.get("indexStatusResult", {})
# Get key information
verdict = index_status.get("verdict", "UNKNOWN")
coverage = index_status.get("coverageState", "Unknown")
last_crawl = "Never"
if "lastCrawlTime" in index_status:
try:
crawl_time = datetime.fromisoformat(index_status["lastCrawlTime"].replace('Z', '+00:00'))
last_crawl = crawl_time.strftime('%Y-%m-%d')
except:
last_crawl = index_status["lastCrawlTime"]
# Check for rich results
rich_results = "None"
if "richResultsResult" in inspection:
rich = inspection["richResultsResult"]
if rich.get("verdict") == "PASS" and "detectedItems" in rich and rich["detectedItems"]:
rich_types = [item.get("richResultType", "Unknown") for item in rich["detectedItems"]]
rich_results = ", ".join(rich_types)
# Format result
results.append(f"{page_url}:\n Status: {verdict} - {coverage}\n Last Crawl: {last_crawl}\n Rich Results: {rich_results}\n")
except Exception as e:
results.append(f"{page_url}: Error - {str(e)}")
# Combine results
return f"Batch URL Inspection Results for {site_url}:\n\n" + "\n".join(results)
except Exception as e:
return f"Error performing batch inspection: {str(e)}"
@mcp.tool()
async def check_indexing_issues(site_url: str, urls: str) -> str:
"""
Check for specific indexing issues across multiple URLs.
Args:
site_url: The URL of the site in Search Console (must be exact match, for domain properties use format: sc-domain:example.com)
urls: List of URLs to check, one per line
"""
try:
service = get_gsc_service()
# Parse URLs
url_list = [url.strip() for url in urls.split('\n') if url.strip()]
if not url_list:
return "No URLs provided for inspection."
if len(url_list) > 10:
return f"Too many URLs provided ({len(url_list)}). Please limit to 10 URLs per batch to avoid API quota issues."
# Track issues by category
issues_summary = {
"not_indexed": [],
"canonical_issues": [],
"robots_blocked": [],
"fetch_issues": [],
"indexed": []
}
# Process each URL
for page_url in url_list:
# Build request
request = {
"inspectionUrl": page_url,
"siteUrl": site_url
}
try:
# Execute request
response = service.urlInspection().index().inspect(body=request).execute()
if not response or "inspectionResult" not in response:
issues_summary["not_indexed"].append(f"{page_url} - No inspection data found")
continue
inspection = response["inspectionResult"]
index_status = inspection.get("indexStatusResult", {})
# Check indexing status
verdict = index_status.get("verdict", "UNKNOWN")
coverage = index_status.get("coverageState", "Unknown")
if verdict != "PASS" or "not indexed" in coverage.lower() or "excluded" in coverage.lower():
issues_summary["not_indexed"].append(f"{page_url} - {coverage}")
else:
issues_summary["indexed"].append(page_url)
# Check canonical issues
google_canonical = index_status.get("googleCanonical", "")
user_canonical = index_status.get("userCanonical", "")
if google_canonical and user_canonical and google_canonical != user_canonical:
issues_summary["canonical_issues"].append(
f"{page_url} - Google chose: {google_canonical} instead of user-declared: {user_canonical}"
)
# Check robots.txt status
robots_state = index_status.get("robotsTxtState", "")
if robots_state == "BLOCKED":
issues_summary["robots_blocked"].append(page_url)
# Check fetch issues
fetch_state = index_status.get("pageFetchState", "")
if fetch_state != "SUCCESSFUL":
issues_summary["fetch_issues"].append(f"{page_url} - {fetch_state}")
except Exception as e:
issues_summary["not_indexed"].append(f"{page_url} - Error: {str(e)}")
# Format results
result_lines = [f"Indexing Issues Report for {site_url}:"]
result_lines.append("-" * 80)
# Summary counts
result_lines.append(f"Total URLs checked: {len(url_list)}")
result_lines.append(f"Indexed: {len(issues_summary['indexed'])}")
result_lines.append(f"Not indexed: {len(issues_summary['not_indexed'])}")
result_lines.append(f"Canonical issues: {len(issues_summary['canonical_issues'])}")
result_lines.append(f"Robots.txt blocked: {len(issues_summary['robots_blocked'])}")
result_lines.append(f"Fetch issues: {len(issues_summary['fetch_issues'])}")
result_lines.append("-" * 80)
# Detailed issues
if issues_summary["not_indexed"]:
result_lines.append("\nNot Indexed URLs:")
for issue in issues_summary["not_indexed"]:
result_lines.append(f"- {issue}")
if issues_summary["canonical_issues"]:
result_lines.append("\nCanonical Issues:")
for issue in issues_summary["canonical_issues"]:
result_lines.append(f"- {issue}")
if issues_summary["robots_blocked"]:
result_lines.append("\nRobots.txt Blocked URLs:")
for url in issues_summary["robots_blocked"]:
result_lines.append(f"- {url}")
if issues_summary["fetch_issues"]:
result_lines.append("\nFetch Issues:")
for issue in issues_summary["fetch_issues"]:
result_lines.append(f"- {issue}")
return "\n".join(result_lines)
except Exception as e:
return f"Error checking indexing issues: {str(e)}"
@mcp.tool()
async def get_performance_overview(site_url: str, days: int = 28) -> str:
"""
Get a performance overview for a specific property.
Args:
site_url: The URL of the site in Search Console (must be exact match)
days: Number of days to look back (default: 28)
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Get total metrics
total_request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": [], # No dimensions for totals
"rowLimit": 1
}
total_response = service.searchanalytics().query(siteUrl=site_url, body=total_request).execute()
# Get by date for trend
date_request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": ["date"],
"rowLimit": days
}
date_response = service.searchanalytics().query(siteUrl=site_url, body=date_request).execute()
# Format results
result_lines = [f"Performance Overview for {site_url} (last {days} days):"]
result_lines.append("-" * 80)
# Add total metrics
if total_response.get("rows"):
row = total_response["rows"][0]
result_lines.append(f"Total Clicks: {row.get('clicks', 0):,}")
result_lines.append(f"Total Impressions: {row.get('impressions', 0):,}")
result_lines.append(f"Average CTR: {row.get('ctr', 0) * 100:.2f}%")
result_lines.append(f"Average Position: {row.get('position', 0):.1f}")
else:
result_lines.append("No data available for the selected period.")
return "\n".join(result_lines)
# Add trend data
if date_response.get("rows"):
result_lines.append("\nDaily Trend:")
result_lines.append("Date | Clicks | Impressions | CTR | Position")
result_lines.append("-" * 80)
# Sort by date
sorted_rows = sorted(date_response["rows"], key=lambda x: x["keys"][0])
for row in sorted_rows:
date_str = row["keys"][0]
# Format date from YYYY-MM-DD to MM/DD
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
date_formatted = date_obj.strftime("%m/%d")
except:
date_formatted = date_str
clicks = row.get("clicks", 0)
impressions = row.get("impressions", 0)
ctr = row.get("ctr", 0) * 100
position = row.get("position", 0)
result_lines.append(f"{date_formatted} | {clicks:.0f} | {impressions:.0f} | {ctr:.2f}% | {position:.1f}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving performance overview: {str(e)}"
@mcp.tool()
async def get_advanced_search_analytics(
site_url: str,
start_date: str = None,
end_date: str = None,
dimensions: str = "query",
search_type: str = "WEB",
row_limit: int = 1000,
start_row: int = 0,
sort_by: str = "clicks",
sort_direction: str = "descending",
filter_dimension: str = None,
filter_operator: str = "contains",
filter_expression: str = None
) -> str:
"""
Get advanced search analytics data with sorting, filtering, and pagination.
Args:
site_url: The URL of the site in Search Console (must be exact match)
start_date: Start date in YYYY-MM-DD format (defaults to 28 days ago)
end_date: End date in YYYY-MM-DD format (defaults to today)
dimensions: Dimensions to group by, comma-separated (e.g., "query,page,device")
search_type: Type of search results (WEB, IMAGE, VIDEO, NEWS, DISCOVER)
row_limit: Maximum number of rows to return (max 25000)
start_row: Starting row for pagination
sort_by: Metric to sort by (clicks, impressions, ctr, position)
sort_direction: Sort direction (ascending or descending)
filter_dimension: Dimension to filter on (query, page, country, device)
filter_operator: Filter operator (contains, equals, notContains, notEquals)
filter_expression: Filter expression value
"""
try:
service = get_gsc_service()
# Calculate date range if not provided
if not end_date:
end_date = datetime.now().date().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now().date() - timedelta(days=28)).strftime("%Y-%m-%d")
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build request
request = {
"startDate": start_date,
"endDate": end_date,
"dimensions": dimension_list,
"rowLimit": min(row_limit, 25000), # Cap at API maximum
"startRow": start_row,
"searchType": search_type.upper()
}
# Add sorting
if sort_by:
metric_map = {
"clicks": "CLICK_COUNT",
"impressions": "IMPRESSION_COUNT",
"ctr": "CTR",
"position": "POSITION"
}
if sort_by in metric_map:
request["orderBy"] = [{
"metric": metric_map[sort_by],
"direction": sort_direction.lower()
}]
# Add filtering if provided
if filter_dimension and filter_expression:
filter_group = {
"filters": [{
"dimension": filter_dimension,
"operator": filter_operator,
"expression": filter_expression
}]
}
request["dimensionFilterGroups"] = [filter_group]
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return (f"No search analytics data found for {site_url} with the specified parameters.\n\n"
f"Parameters used:\n"
f"- Date range: {start_date} to {end_date}\n"
f"- Dimensions: {dimensions}\n"
f"- Search type: {search_type}\n"
f"- Filter: {filter_dimension} {filter_operator} '{filter_expression}'" if filter_dimension else "- No filter applied")
# Format results
result_lines = [f"Search analytics for {site_url}:"]
result_lines.append(f"Date range: {start_date} to {end_date}")
result_lines.append(f"Search type: {search_type}")
if filter_dimension:
result_lines.append(f"Filter: {filter_dimension} {filter_operator} '{filter_expression}'")
result_lines.append(f"Showing rows {start_row+1} to {start_row+len(response.get('rows', []))} (sorted by {sort_by} {sort_direction})")
result_lines.append("\n" + "-" * 80 + "\n")
# Create header based on dimensions
header = []
for dim in dimension_list:
header.append(dim.capitalize())
header.extend(["Clicks", "Impressions", "CTR", "Position"])
result_lines.append(" | ".join(header))
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
data = []
# Add dimension values
for dim_value in row.get("keys", []):
data.append(dim_value[:100]) # Increased truncation limit to 100 characters
# Add metrics
data.append(str(row.get("clicks", 0)))
data.append(str(row.get("impressions", 0)))
data.append(f"{row.get('ctr', 0) * 100:.2f}%")
data.append(f"{row.get('position', 0):.1f}")
result_lines.append(" | ".join(data))
# Add pagination info if there might be more results
if len(response.get("rows", [])) == row_limit:
next_start = start_row + row_limit
result_lines.append("\nThere may be more results available. To see the next page, use:")
result_lines.append(f"start_row: {next_start}, row_limit: {row_limit}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving advanced search analytics: {str(e)}"
@mcp.tool()
async def compare_search_periods(
site_url: str,
period1_start: str,
period1_end: str,
period2_start: str,
period2_end: str,
dimensions: str = "query",
limit: int = 10
) -> str:
"""
Compare search analytics data between two time periods.
Args:
site_url: The URL of the site in Search Console (must be exact match)
period1_start: Start date for period 1 (YYYY-MM-DD)
period1_end: End date for period 1 (YYYY-MM-DD)
period2_start: Start date for period 2 (YYYY-MM-DD)
period2_end: End date for period 2 (YYYY-MM-DD)
dimensions: Dimensions to group by (default: query)
limit: Number of top results to compare (default: 10)
"""
try:
service = get_gsc_service()
# Parse dimensions
dimension_list = [d.strip() for d in dimensions.split(",")]
# Build requests for both periods
period1_request = {
"startDate": period1_start,
"endDate": period1_end,
"dimensions": dimension_list,
"rowLimit": 1000 # Get more to ensure we can match items between periods
}
period2_request = {
"startDate": period2_start,
"endDate": period2_end,
"dimensions": dimension_list,
"rowLimit": 1000
}
# Execute requests
period1_response = service.searchanalytics().query(siteUrl=site_url, body=period1_request).execute()
period2_response = service.searchanalytics().query(siteUrl=site_url, body=period2_request).execute()
period1_rows = period1_response.get("rows", [])
period2_rows = period2_response.get("rows", [])
if not period1_rows and not period2_rows:
return f"No data found for either period for {site_url}."
# Create dictionaries for easy lookup
period1_data = {tuple(row.get("keys", [])): row for row in period1_rows}
period2_data = {tuple(row.get("keys", [])): row for row in period2_rows}
# Find common keys and calculate differences
all_keys = set(period1_data.keys()) | set(period2_data.keys())
comparison_data = []
for key in all_keys:
p1_row = period1_data.get(key, {"clicks": 0, "impressions": 0, "ctr": 0, "position": 0})
p2_row = period2_data.get(key, {"clicks": 0, "impressions": 0, "ctr": 0, "position": 0})
# Calculate differences
click_diff = p2_row.get("clicks", 0) - p1_row.get("clicks", 0)
click_pct = (click_diff / p1_row.get("clicks", 1)) * 100 if p1_row.get("clicks", 0) > 0 else float('inf')
imp_diff = p2_row.get("impressions", 0) - p1_row.get("impressions", 0)
imp_pct = (imp_diff / p1_row.get("impressions", 1)) * 100 if p1_row.get("impressions", 0) > 0 else float('inf')
ctr_diff = p2_row.get("ctr", 0) - p1_row.get("ctr", 0)
pos_diff = p1_row.get("position", 0) - p2_row.get("position", 0) # Note: lower position is better
comparison_data.append({
"key": key,
"p1_clicks": p1_row.get("clicks", 0),
"p2_clicks": p2_row.get("clicks", 0),
"click_diff": click_diff,
"click_pct": click_pct,
"p1_impressions": p1_row.get("impressions", 0),
"p2_impressions": p2_row.get("impressions", 0),
"imp_diff": imp_diff,
"imp_pct": imp_pct,
"p1_ctr": p1_row.get("ctr", 0),
"p2_ctr": p2_row.get("ctr", 0),
"ctr_diff": ctr_diff,
"p1_position": p1_row.get("position", 0),
"p2_position": p2_row.get("position", 0),
"pos_diff": pos_diff
})
# Sort by absolute click difference (can change to other metrics)
comparison_data.sort(key=lambda x: abs(x["click_diff"]), reverse=True)
# Format results
result_lines = [f"Search analytics comparison for {site_url}:"]
result_lines.append(f"Period 1: {period1_start} to {period1_end}")
result_lines.append(f"Period 2: {period2_start} to {period2_end}")
result_lines.append(f"Dimension(s): {dimensions}")
result_lines.append(f"Top {min(limit, len(comparison_data))} results by change in clicks:")
result_lines.append("\n" + "-" * 100 + "\n")
# Create header
dim_header = " | ".join([d.capitalize() for d in dimension_list])
result_lines.append(f"{dim_header} | P1 Clicks | P2 Clicks | Change | % | P1 Pos | P2 Pos | Pos Δ")
result_lines.append("-" * 100)
# Add data rows (limited to requested number)
for item in comparison_data[:limit]:
key_str = " | ".join([str(k)[:100] for k in item["key"]])
# Format the click change with color indicators
click_change = item["click_diff"]
click_pct = item["click_pct"] if item["click_pct"] != float('inf') else "N/A"
click_pct_str = f"{click_pct:.1f}%" if click_pct != "N/A" else "N/A"
# Format position change (positive is good - moving up in rankings)
pos_change = item["pos_diff"]
result_lines.append(
f"{key_str} | {item['p1_clicks']} | {item['p2_clicks']} | "
f"{click_change:+d} | {click_pct_str} | "
f"{item['p1_position']:.1f} | {item['p2_position']:.1f} | {pos_change:+.1f}"
)
return "\n".join(result_lines)
except Exception as e:
return f"Error comparing search periods: {str(e)}"
@mcp.tool()
async def get_search_by_page_query(
site_url: str,
page_url: str,
days: int = 28
) -> str:
"""
Get search analytics data for a specific page, broken down by query.
Args:
site_url: The URL of the site in Search Console (must be exact match)
page_url: The specific page URL to analyze
days: Number of days to look back (default: 28)
"""
try:
service = get_gsc_service()
# Calculate date range
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Build request with page filter
request = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
"dimensions": ["query"],
"dimensionFilterGroups": [{
"filters": [{
"dimension": "page",
"operator": "equals",
"expression": page_url
}]
}],
"rowLimit": 20, # Top 20 queries for this page
"orderBy": [{"metric": "CLICK_COUNT", "direction": "descending"}]
}
# Execute request
response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
if not response.get("rows"):
return f"No search data found for page {page_url} in the last {days} days."
# Format results
result_lines = [f"Search queries for page {page_url} (last {days} days):"]
result_lines.append("\n" + "-" * 80 + "\n")
# Create header
result_lines.append("Query | Clicks | Impressions | CTR | Position")
result_lines.append("-" * 80)
# Add data rows
for row in response.get("rows", []):
query = row.get("keys", ["Unknown"])[0]
clicks = row.get("clicks", 0)
impressions = row.get("impressions", 0)
ctr = row.get("ctr", 0) * 100
position = row.get("position", 0)
result_lines.append(f"{query[:100]} | {clicks} | {impressions} | {ctr:.2f}% | {position:.1f}")
# Add total metrics
total_clicks = sum(row.get("clicks", 0) for row in response.get("rows", []))
total_impressions = sum(row.get("impressions", 0) for row in response.get("rows", []))
avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0
result_lines.append("-" * 80)
result_lines.append(f"TOTAL | {total_clicks} | {total_impressions} | {avg_ctr:.2f}% | -")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving page query data: {str(e)}"
@mcp.tool()
async def list_sitemaps_enhanced(site_url: str, sitemap_index: str = None) -> str:
"""
List all sitemaps for a specific Search Console property with detailed information.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_index: Optional sitemap index URL to list child sitemaps
"""
try:
service = get_gsc_service()
# Get sitemaps list
if sitemap_index:
sitemaps = service.sitemaps().list(siteUrl=site_url, sitemapIndex=sitemap_index).execute()
source = f"child sitemaps from index: {sitemap_index}"
else:
sitemaps = service.sitemaps().list(siteUrl=site_url).execute()
source = "all submitted sitemaps"
if not sitemaps.get("sitemap"):
return f"No sitemaps found for {site_url}" + (f" in index {sitemap_index}" if sitemap_index else ".")
# Format the results
result_lines = [f"Sitemaps for {site_url} ({source}):"]
result_lines.append("-" * 100)
# Header
result_lines.append("Path | Last Submitted | Last Downloaded | Type | URLs | Errors | Warnings")
result_lines.append("-" * 100)
# Add each sitemap
for sitemap in sitemaps.get("sitemap", []):
path = sitemap.get("path", "Unknown")
# Format dates
last_submitted = sitemap.get("lastSubmitted", "Never")
if last_submitted != "Never":
try:
dt = datetime.fromisoformat(last_submitted.replace('Z', '+00:00'))
last_submitted = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
last_downloaded = sitemap.get("lastDownloaded", "Never")
if last_downloaded != "Never":
try:
dt = datetime.fromisoformat(last_downloaded.replace('Z', '+00:00'))
last_downloaded = dt.strftime("%Y-%m-%d %H:%M")
except:
pass
# Determine type
sitemap_type = "Index" if sitemap.get("isSitemapsIndex", False) else "Sitemap"
# Get counts
errors = sitemap.get("errors", 0)
warnings = sitemap.get("warnings", 0)
# Get URL counts
url_count = "N/A"
if "contents" in sitemap:
for content in sitemap["contents"]:
if content.get("type") == "web":
url_count = content.get("submitted", "0")
break
result_lines.append(f"{path} | {last_submitted} | {last_downloaded} | {sitemap_type} | {url_count} | {errors} | {warnings}")
# Add processing status if available
pending_count = sum(1 for sitemap in sitemaps.get("sitemap", []) if sitemap.get("isPending", False))
if pending_count > 0:
result_lines.append(f"\nNote: {pending_count} sitemaps are still pending processing by Google.")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemaps: {str(e)}"
@mcp.tool()
async def get_sitemap_details(site_url: str, sitemap_url: str) -> str:
"""
Get detailed information about a specific sitemap.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to inspect
"""
try:
service = get_gsc_service()
# Get sitemap details
details = service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
if not details:
return f"No details found for sitemap {sitemap_url}."
# Format the results
result_lines = [f"Sitemap Details for {sitemap_url}:"]
result_lines.append("-" * 80)
# Basic info
is_index = details.get("isSitemapsIndex", False)
result_lines.append(f"Type: {'Sitemap Index' if is_index else 'Sitemap'}")
# Status
is_pending = details.get("isPending", False)
result_lines.append(f"Status: {'Pending processing' if is_pending else 'Processed'}")
# Dates
if "lastSubmitted" in details:
try:
dt = datetime.fromisoformat(details["lastSubmitted"].replace('Z', '+00:00'))
result_lines.append(f"Last Submitted: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Submitted: {details['lastSubmitted']}")
if "lastDownloaded" in details:
try:
dt = datetime.fromisoformat(details["lastDownloaded"].replace('Z', '+00:00'))
result_lines.append(f"Last Downloaded: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Last Downloaded: {details['lastDownloaded']}")
# Errors and warnings
result_lines.append(f"Errors: {details.get('errors', 0)}")
result_lines.append(f"Warnings: {details.get('warnings', 0)}")
# Content breakdown
if "contents" in details and details["contents"]:
result_lines.append("\nContent Breakdown:")
for content in details["contents"]:
content_type = content.get("type", "Unknown").upper()
submitted = content.get("submitted", 0)
indexed = content.get("indexed", "N/A")
result_lines.append(f"- {content_type}: {submitted} submitted, {indexed} indexed")
# If it's an index, suggest how to list child sitemaps
if is_index:
result_lines.append("\nThis is a sitemap index. To list child sitemaps, use:")
result_lines.append(f"list_sitemaps_enhanced with sitemap_index={sitemap_url}")
return "\n".join(result_lines)
except Exception as e:
return f"Error retrieving sitemap details: {str(e)}"
@mcp.tool()
async def submit_sitemap(site_url: str, sitemap_url: str) -> str:
"""
Submit a new sitemap or resubmit an existing one to Google.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to submit
"""
try:
service = get_gsc_service()
# Submit the sitemap
service.sitemaps().submit(siteUrl=site_url, feedpath=sitemap_url).execute()
# Verify submission by getting details
try:
details = service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
# Format response
result_lines = [f"Successfully submitted sitemap: {sitemap_url}"]
# Add submission time if available
if "lastSubmitted" in details:
try:
dt = datetime.fromisoformat(details["lastSubmitted"].replace('Z', '+00:00'))
result_lines.append(f"Submission time: {dt.strftime('%Y-%m-%d %H:%M')}")
except:
result_lines.append(f"Submission time: {details['lastSubmitted']}")
# Add processing status
is_pending = details.get("isPending", True)
result_lines.append(f"Status: {'Pending processing' if is_pending else 'Processing started'}")
# Add note about processing time
result_lines.append("\nNote: Google may take some time to process the sitemap. Check back later for full details.")
return "\n".join(result_lines)
except:
# If we can't get details, just return basic success message
return f"Successfully submitted sitemap: {sitemap_url}\n\nGoogle will queue it for processing."
except Exception as e:
return f"Error submitting sitemap: {str(e)}"
@mcp.tool()
async def delete_sitemap(site_url: str, sitemap_url: str) -> str:
"""
Delete (unsubmit) a sitemap from Google Search Console.
Args:
site_url: The URL of the site in Search Console (must be exact match)
sitemap_url: The full URL of the sitemap to delete
"""
try:
service = get_gsc_service()
# First check if the sitemap exists
try:
service.sitemaps().get(siteUrl=site_url, feedpath=sitemap_url).execute()
except Exception as e:
if "404" in str(e):
return f"Sitemap not found: {sitemap_url}. It may have already been deleted or was never submitted."
else:
raise e
# Delete the sitemap
service.sitemaps().delete(siteUrl=site_url, feedpath=sitemap_url).execute()
return f"Successfully deleted sitemap: {sitemap_url}\n\nNote: This only removes the sitemap from Search Console. Any URLs already indexed will remain in Google's index."
except Exception as e:
return f"Error deleting sitemap: {str(e)}"
@mcp.tool()
async def manage_sitemaps(site_url: str, action: str, sitemap_url: str = None, sitemap_index: str = None) -> str:
"""
All-in-one tool to manage sitemaps (list, get details, submit, delete).
Args:
site_url: The URL of the site in Search Console (must be exact match)
action: The action to perform (list, details, submit, delete)
sitemap_url: The full URL of the sitemap (required for details, submit, delete)
sitemap_index: Optional sitemap index URL for listing child sitemaps (only used with 'list' action)
"""
try:
# Validate inputs
action = action.lower().strip()
valid_actions = ["list", "details", "submit", "delete"]
if action not in valid_actions:
return f"Invalid action: {action}. Please use one of: {', '.join(valid_actions)}"
if action in ["details", "submit", "delete"] and not sitemap_url:
return f"The {action} action requires a sitemap_url parameter."
# Perform the requested action
if action == "list":
return await list_sitemaps_enhanced(site_url, sitemap_index)
elif action == "details":
return await get_sitemap_details(site_url, sitemap_url)
elif action == "submit":
return await submit_sitemap(site_url, sitemap_url)
elif action == "delete":
return await delete_sitemap(site_url, sitemap_url)
except Exception as e:
return f"Error managing sitemaps: {str(e)}"
@mcp.tool()
async def get_creator_info() -> str:
"""
Provides information about Amin Foroutan, the creator of the MCP-GSC tool.
"""
creator_info = """
# About the Creator: Amin Foroutan
Amin Foroutan is an SEO consultant with over a decade of experience, specializing in technical SEO, Python-driven tools, and data analysis for SEO performance.
## Connect with Amin:
- **LinkedIn**: [Amin Foroutan](https://www.linkedin.com/in/ma-foroutan/)
- **Personal Website**: [aminforoutan.com](https://aminforoutan.com/)
- **YouTube**: [Amin Forout](https://www.youtube.com/channel/UCW7tPXg-rWdH4YzLrcAdBIw)
- **X (Twitter)**: [@aminfseo](https://x.com/aminfseo)
## Notable Projects:
Amin has created several popular SEO tools including:
- Advanced GSC Visualizer (6.4K+ users)
- SEO Render Insight Tool (3.5K+ users)
- Google AI Overview Impact Analysis (1.2K+ users)
- Google AI Overview Citation Analysis (900+ users)
- SEMRush Enhancer (570+ users)
- SEO Page Inspector (115+ users)
## Expertise:
Amin combines technical SEO knowledge with programming skills to create innovative solutions for SEO challenges.
"""
return creator_info
if __name__ == "__main__":
# Start the MCP server on stdio transport
mcp.run(transport="stdio")