We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Google PageSpeed Insights API Client.
Provides async methods to analyze web pages using the PageSpeed Insights API v5.
Returns structured results including Lighthouse scores and Core Web Vitals.
API Documentation: https://developers.google.com/speed/docs/insights/v5/get-started
"""
import os
import asyncio
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from urllib.parse import urlencode
import aiohttp
from ..utils.progress import logger
@dataclass
class CoreWebVitals:
"""Core Web Vitals metrics."""
lcp_ms: Optional[int] = None # Largest Contentful Paint
fid_ms: Optional[int] = None # First Input Delay (legacy)
inp_ms: Optional[int] = None # Interaction to Next Paint
cls: Optional[float] = None # Cumulative Layout Shift
fcp_ms: Optional[int] = None # First Contentful Paint
ttfb_ms: Optional[int] = None # Time to First Byte
tbt_ms: Optional[int] = None # Total Blocking Time
speed_index_ms: Optional[int] = None # Speed Index
@dataclass
class LighthouseScores:
"""Lighthouse category scores (0-100)."""
performance: Optional[int] = None
accessibility: Optional[int] = None
best_practices: Optional[int] = None
seo: Optional[int] = None
@dataclass
class PageSpeedResult:
"""Complete PageSpeed Insights result."""
url: str
strategy: str # 'mobile' or 'desktop'
scores: LighthouseScores
vitals: CoreWebVitals
lighthouse_version: Optional[str] = None
fetch_time: Optional[str] = None
raw_response: Optional[Dict[str, Any]] = None
error: Optional[str] = None
@property
def is_success(self) -> bool:
return self.error is None and self.scores.performance is not None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for database storage."""
return {
'url': self.url,
'strategy': self.strategy,
'performance_score': self.scores.performance,
'accessibility_score': self.scores.accessibility,
'best_practices_score': self.scores.best_practices,
'seo_score': self.scores.seo,
'lcp_ms': self.vitals.lcp_ms,
'fid_ms': self.vitals.fid_ms,
'inp_ms': self.vitals.inp_ms,
'cls': self.vitals.cls,
'fcp_ms': self.vitals.fcp_ms,
'ttfb_ms': self.vitals.ttfb_ms,
'tbt_ms': self.vitals.tbt_ms,
'speed_index_ms': self.vitals.speed_index_ms,
}
# Core Web Vitals thresholds (Google's standards)
# https://web.dev/vitals/
CWV_THRESHOLDS = {
'lcp': {
'good': 2500, # <= 2.5s
'needs_improvement': 4000, # <= 4s
},
'fid': {
'good': 100, # <= 100ms
'needs_improvement': 300, # <= 300ms
},
'inp': {
'good': 200, # <= 200ms
'needs_improvement': 500, # <= 500ms
},
'cls': {
'good': 0.1, # <= 0.1
'needs_improvement': 0.25, # <= 0.25
},
}
def get_cwv_status(metric: str, value: float) -> str:
"""Get Core Web Vital status: 'good', 'needs_improvement', or 'poor'."""
if metric not in CWV_THRESHOLDS:
return 'unknown'
thresholds = CWV_THRESHOLDS[metric]
if value <= thresholds['good']:
return 'good'
elif value <= thresholds['needs_improvement']:
return 'needs_improvement'
else:
return 'poor'
class PageSpeedClient:
"""
Client for Google PageSpeed Insights API v5.
Usage:
client = PageSpeedClient(api_key='...')
result = await client.analyze('https://example.com', strategy='mobile')
print(result.scores.performance)
"""
API_URL = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
# Rate limits: 1 query/sec without key, 400 queries/100 sec with key
DEFAULT_RATE_LIMIT = 1.0 # seconds between requests
def __init__(
self,
api_key: Optional[str] = None,
timeout: int = 120, # PageSpeed can take a while
):
"""
Initialize PageSpeed client.
Args:
api_key: Google API key (optional but recommended for higher quota)
timeout: Request timeout in seconds
"""
self.api_key = api_key or os.getenv('PAGESPEED_API_KEY')
self.timeout = timeout
self._last_request_time = 0.0
async def _rate_limit(self):
"""Enforce rate limiting between requests."""
now = asyncio.get_event_loop().time()
elapsed = now - self._last_request_time
if elapsed < self.DEFAULT_RATE_LIMIT:
await asyncio.sleep(self.DEFAULT_RATE_LIMIT - elapsed)
self._last_request_time = asyncio.get_event_loop().time()
def _extract_metric_value(
self,
lighthouse_result: Dict[str, Any],
audit_id: str,
numeric: bool = True
) -> Optional[float]:
"""Extract a metric value from Lighthouse audit results."""
try:
audits = lighthouse_result.get('audits', {})
audit = audits.get(audit_id, {})
if numeric:
return audit.get('numericValue')
return audit.get('displayValue')
except Exception:
return None
def _extract_score(
self,
lighthouse_result: Dict[str, Any],
category: str
) -> Optional[int]:
"""Extract a category score (0-100) from Lighthouse results."""
try:
categories = lighthouse_result.get('categories', {})
cat_data = categories.get(category, {})
score = cat_data.get('score')
if score is not None:
return int(score * 100)
return None
except Exception:
return None
def _parse_response(
self,
response_data: Dict[str, Any],
url: str,
strategy: str
) -> PageSpeedResult:
"""Parse PageSpeed API response into structured result."""
lighthouse = response_data.get('lighthouseResult', {})
# Extract Lighthouse scores
scores = LighthouseScores(
performance=self._extract_score(lighthouse, 'performance'),
accessibility=self._extract_score(lighthouse, 'accessibility'),
best_practices=self._extract_score(lighthouse, 'best-practices'),
seo=self._extract_score(lighthouse, 'seo'),
)
# Extract Core Web Vitals
lcp = self._extract_metric_value(lighthouse, 'largest-contentful-paint')
fid = self._extract_metric_value(lighthouse, 'max-potential-fid')
inp = self._extract_metric_value(lighthouse, 'interaction-to-next-paint')
cls_val = self._extract_metric_value(lighthouse, 'cumulative-layout-shift')
fcp = self._extract_metric_value(lighthouse, 'first-contentful-paint')
ttfb = self._extract_metric_value(lighthouse, 'server-response-time')
tbt = self._extract_metric_value(lighthouse, 'total-blocking-time')
speed_index = self._extract_metric_value(lighthouse, 'speed-index')
vitals = CoreWebVitals(
lcp_ms=int(lcp) if lcp else None,
fid_ms=int(fid) if fid else None,
inp_ms=int(inp) if inp else None,
cls=round(cls_val, 3) if cls_val is not None else None,
fcp_ms=int(fcp) if fcp else None,
ttfb_ms=int(ttfb) if ttfb else None,
tbt_ms=int(tbt) if tbt else None,
speed_index_ms=int(speed_index) if speed_index else None,
)
return PageSpeedResult(
url=url,
strategy=strategy,
scores=scores,
vitals=vitals,
lighthouse_version=lighthouse.get('lighthouseVersion'),
fetch_time=lighthouse.get('fetchTime'),
raw_response=response_data,
)
async def analyze(
self,
url: str,
strategy: str = 'mobile',
categories: Optional[List[str]] = None,
) -> PageSpeedResult:
"""
Run PageSpeed analysis on a URL.
Args:
url: Full URL to analyze
strategy: 'mobile' or 'desktop'
categories: Lighthouse categories to include (default: all)
Returns:
PageSpeedResult with scores, metrics, and raw response
"""
await self._rate_limit()
# Build request parameters
params = {
'url': url,
'strategy': strategy,
}
if categories is None:
categories = ['performance', 'accessibility', 'best-practices', 'seo']
for category in categories:
params[f'category'] = category # API accepts multiple category params
if self.api_key:
params['key'] = self.api_key
# Build URL with multiple category params
base_url = f"{self.API_URL}?"
param_parts = [f"url={url}", f"strategy={strategy}"]
for cat in categories:
param_parts.append(f"category={cat}")
if self.api_key:
param_parts.append(f"key={self.api_key}")
request_url = base_url + "&".join(param_parts)
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(request_url) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"PageSpeed API error for {url}: {response.status} - {error_text[:200]}")
return PageSpeedResult(
url=url,
strategy=strategy,
scores=LighthouseScores(),
vitals=CoreWebVitals(),
error=f"API error: {response.status}",
)
data = await response.json()
return self._parse_response(data, url, strategy)
except asyncio.TimeoutError:
logger.error(f"PageSpeed API timeout for {url} (>{self.timeout}s)")
return PageSpeedResult(
url=url,
strategy=strategy,
scores=LighthouseScores(),
vitals=CoreWebVitals(),
error=f"Timeout after {self.timeout}s",
)
except Exception as e:
logger.error(f"PageSpeed API error for {url}: {e}")
return PageSpeedResult(
url=url,
strategy=strategy,
scores=LighthouseScores(),
vitals=CoreWebVitals(),
error=str(e),
)
async def analyze_multiple(
self,
urls: List[str],
strategy: str = 'mobile',
max_concurrent: int = 3,
) -> List[PageSpeedResult]:
"""
Analyze multiple URLs with controlled concurrency.
Args:
urls: List of URLs to analyze
strategy: 'mobile' or 'desktop'
max_concurrent: Max concurrent requests
Returns:
List of PageSpeedResult for each URL
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def analyze_with_semaphore(url: str) -> PageSpeedResult:
async with semaphore:
return await self.analyze(url, strategy)
tasks = [analyze_with_semaphore(url) for url in urls]
return await asyncio.gather(*tasks)