We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
PageSpeed Analyzer with Anomaly Detection.
Compares PageSpeed results against historical baselines to detect
performance regressions and Core Web Vital threshold violations.
"""
import os
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime
from supabase import create_client, Client
from ..utils.progress import logger
from ..clients.pagespeed_client import (
PageSpeedResult,
CWV_THRESHOLDS,
get_cwv_status,
)
@dataclass
class Baseline:
"""Historical baseline for a URL/strategy combination."""
url: str
strategy: str
avg_performance: Optional[float] = None
avg_lcp: Optional[float] = None
avg_cls: Optional[float] = None
avg_fcp: Optional[float] = None
avg_tbt: Optional[float] = None
sample_count: int = 0
@property
def has_data(self) -> bool:
return self.sample_count > 0
@dataclass
class AnomalyDetail:
"""Details about a specific anomaly detected."""
metric: str
current_value: float
baseline_value: Optional[float]
threshold: Optional[float]
change: Optional[float]
severity: str # 'warning', 'critical'
description: str
@dataclass
class AnomalyResult:
"""Result of anomaly detection for a single page."""
url: str
strategy: str
is_anomaly: bool
anomalies: List[AnomalyDetail] = field(default_factory=list)
baseline: Optional[Baseline] = None
@property
def has_critical(self) -> bool:
return any(a.severity == 'critical' for a in self.anomalies)
@property
def severity(self) -> str:
if self.has_critical:
return 'critical'
elif self.anomalies:
return 'warning'
return 'none'
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage."""
return {
'is_anomaly': self.is_anomaly,
'severity': self.severity,
'anomaly_count': len(self.anomalies),
'anomalies': [
{
'metric': a.metric,
'current_value': a.current_value,
'baseline_value': a.baseline_value,
'threshold': a.threshold,
'change': a.change,
'severity': a.severity,
'description': a.description,
}
for a in self.anomalies
],
'baseline_sample_count': self.baseline.sample_count if self.baseline else 0,
}
# Anomaly detection thresholds
ANOMALY_THRESHOLDS = {
# Performance score drops
'performance_score_drop_warning': 10, # Warning if drops > 10 points
'performance_score_drop_critical': 20, # Critical if drops > 20 points
# LCP increases (milliseconds)
'lcp_increase_warning': 500, # Warning if increases > 500ms
'lcp_increase_critical': 1000, # Critical if increases > 1000ms
# CLS increases
'cls_increase_warning': 0.05, # Warning if increases > 0.05
'cls_increase_critical': 0.1, # Critical if increases > 0.1
# TBT increases (milliseconds)
'tbt_increase_warning': 200, # Warning if increases > 200ms
'tbt_increase_critical': 500, # Critical if increases > 500ms
# FCP increases (milliseconds)
'fcp_increase_warning': 500, # Warning if increases > 500ms
'fcp_increase_critical': 1000, # Critical if increases > 1000ms
}
class PageSpeedAnalyzer:
"""
Analyzes PageSpeed results for anomalies.
Compares against:
1. Historical baseline (7-day average)
2. Core Web Vital thresholds (Good/Needs Improvement/Poor)
3. Absolute threshold violations
"""
def __init__(
self,
supabase_url: Optional[str] = None,
supabase_key: Optional[str] = None,
baseline_days: int = 7,
):
"""
Initialize analyzer.
Args:
supabase_url: Supabase project URL (or from env)
supabase_key: Supabase service role key (or from env)
baseline_days: Days to look back for baseline calculation
"""
self.supabase_url = supabase_url or os.getenv('SUPABASE_URL')
self.supabase_key = supabase_key or os.getenv('SUPABASE_SERVICE_ROLE_KEY')
self.baseline_days = baseline_days
if not self.supabase_url or not self.supabase_key:
raise ValueError(
"Supabase credentials required. Set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY."
)
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
def get_baseline(self, url: str, strategy: str) -> Baseline:
"""
Get historical baseline for a URL/strategy combination.
Uses the get_performance_baseline Supabase function.
"""
try:
response = self.supabase.rpc(
'get_performance_baseline',
{
'p_url': url,
'p_strategy': strategy,
'p_days': self.baseline_days,
}
).execute()
if response.data and len(response.data) > 0:
row = response.data[0]
return Baseline(
url=url,
strategy=strategy,
avg_performance=row.get('avg_performance'),
avg_lcp=row.get('avg_lcp'),
avg_cls=row.get('avg_cls'),
avg_fcp=row.get('avg_fcp'),
avg_tbt=row.get('avg_tbt'),
sample_count=row.get('sample_count') or 0,
)
except Exception as e:
logger.warning(f"Failed to get baseline for {url} ({strategy}): {e}")
return Baseline(url=url, strategy=strategy)
def _check_score_regression(
self,
result: PageSpeedResult,
baseline: Baseline,
anomalies: List[AnomalyDetail]
):
"""Check for performance score regressions."""
if result.scores.performance is None:
return
if baseline.has_data and baseline.avg_performance is not None:
drop = baseline.avg_performance - result.scores.performance
if drop >= ANOMALY_THRESHOLDS['performance_score_drop_critical']:
anomalies.append(AnomalyDetail(
metric='performance_score',
current_value=result.scores.performance,
baseline_value=baseline.avg_performance,
threshold=ANOMALY_THRESHOLDS['performance_score_drop_critical'],
change=-drop,
severity='critical',
description=f"Performance score dropped by {drop:.0f} points (from {baseline.avg_performance:.0f} to {result.scores.performance})",
))
elif drop >= ANOMALY_THRESHOLDS['performance_score_drop_warning']:
anomalies.append(AnomalyDetail(
metric='performance_score',
current_value=result.scores.performance,
baseline_value=baseline.avg_performance,
threshold=ANOMALY_THRESHOLDS['performance_score_drop_warning'],
change=-drop,
severity='warning',
description=f"Performance score dropped by {drop:.0f} points (from {baseline.avg_performance:.0f} to {result.scores.performance})",
))
def _check_lcp_regression(
self,
result: PageSpeedResult,
baseline: Baseline,
anomalies: List[AnomalyDetail]
):
"""Check for LCP regressions and threshold violations."""
if result.vitals.lcp_ms is None:
return
lcp_ms = result.vitals.lcp_ms
# Check vs baseline
if baseline.has_data and baseline.avg_lcp is not None:
increase = lcp_ms - baseline.avg_lcp
if increase >= ANOMALY_THRESHOLDS['lcp_increase_critical']:
anomalies.append(AnomalyDetail(
metric='lcp',
current_value=lcp_ms,
baseline_value=baseline.avg_lcp,
threshold=ANOMALY_THRESHOLDS['lcp_increase_critical'],
change=increase,
severity='critical',
description=f"LCP increased by {increase:.0f}ms (from {baseline.avg_lcp:.0f}ms to {lcp_ms}ms)",
))
elif increase >= ANOMALY_THRESHOLDS['lcp_increase_warning']:
anomalies.append(AnomalyDetail(
metric='lcp',
current_value=lcp_ms,
baseline_value=baseline.avg_lcp,
threshold=ANOMALY_THRESHOLDS['lcp_increase_warning'],
change=increase,
severity='warning',
description=f"LCP increased by {increase:.0f}ms (from {baseline.avg_lcp:.0f}ms to {lcp_ms}ms)",
))
# Check vs CWV thresholds (crossing from Good to Needs Improvement or Poor)
status = get_cwv_status('lcp', lcp_ms)
if status == 'poor':
anomalies.append(AnomalyDetail(
metric='lcp_threshold',
current_value=lcp_ms,
baseline_value=None,
threshold=CWV_THRESHOLDS['lcp']['needs_improvement'],
change=None,
severity='critical',
description=f"LCP ({lcp_ms}ms) is in 'Poor' range (>{CWV_THRESHOLDS['lcp']['needs_improvement']}ms)",
))
def _check_cls_regression(
self,
result: PageSpeedResult,
baseline: Baseline,
anomalies: List[AnomalyDetail]
):
"""Check for CLS regressions and threshold violations."""
if result.vitals.cls is None:
return
cls = result.vitals.cls
# Check vs baseline
if baseline.has_data and baseline.avg_cls is not None:
increase = cls - baseline.avg_cls
if increase >= ANOMALY_THRESHOLDS['cls_increase_critical']:
anomalies.append(AnomalyDetail(
metric='cls',
current_value=cls,
baseline_value=baseline.avg_cls,
threshold=ANOMALY_THRESHOLDS['cls_increase_critical'],
change=increase,
severity='critical',
description=f"CLS increased by {increase:.3f} (from {baseline.avg_cls:.3f} to {cls:.3f})",
))
elif increase >= ANOMALY_THRESHOLDS['cls_increase_warning']:
anomalies.append(AnomalyDetail(
metric='cls',
current_value=cls,
baseline_value=baseline.avg_cls,
threshold=ANOMALY_THRESHOLDS['cls_increase_warning'],
change=increase,
severity='warning',
description=f"CLS increased by {increase:.3f} (from {baseline.avg_cls:.3f} to {cls:.3f})",
))
# Check vs CWV thresholds
status = get_cwv_status('cls', cls)
if status == 'poor':
anomalies.append(AnomalyDetail(
metric='cls_threshold',
current_value=cls,
baseline_value=None,
threshold=CWV_THRESHOLDS['cls']['needs_improvement'],
change=None,
severity='critical',
description=f"CLS ({cls:.3f}) is in 'Poor' range (>{CWV_THRESHOLDS['cls']['needs_improvement']})",
))
def _check_tbt_regression(
self,
result: PageSpeedResult,
baseline: Baseline,
anomalies: List[AnomalyDetail]
):
"""Check for TBT (Total Blocking Time) regressions."""
if result.vitals.tbt_ms is None:
return
tbt_ms = result.vitals.tbt_ms
if baseline.has_data and baseline.avg_tbt is not None:
increase = tbt_ms - baseline.avg_tbt
if increase >= ANOMALY_THRESHOLDS['tbt_increase_critical']:
anomalies.append(AnomalyDetail(
metric='tbt',
current_value=tbt_ms,
baseline_value=baseline.avg_tbt,
threshold=ANOMALY_THRESHOLDS['tbt_increase_critical'],
change=increase,
severity='critical',
description=f"TBT increased by {increase:.0f}ms (from {baseline.avg_tbt:.0f}ms to {tbt_ms}ms)",
))
elif increase >= ANOMALY_THRESHOLDS['tbt_increase_warning']:
anomalies.append(AnomalyDetail(
metric='tbt',
current_value=tbt_ms,
baseline_value=baseline.avg_tbt,
threshold=ANOMALY_THRESHOLDS['tbt_increase_warning'],
change=increase,
severity='warning',
description=f"TBT increased by {increase:.0f}ms (from {baseline.avg_tbt:.0f}ms to {tbt_ms}ms)",
))
def analyze(self, result: PageSpeedResult) -> AnomalyResult:
"""
Analyze a PageSpeed result for anomalies.
Args:
result: PageSpeedResult from PageSpeedClient
Returns:
AnomalyResult with detected anomalies
"""
if not result.is_success:
return AnomalyResult(
url=result.url,
strategy=result.strategy,
is_anomaly=False,
anomalies=[],
)
# Get historical baseline
baseline = self.get_baseline(result.url, result.strategy)
# Check for various anomalies
anomalies: List[AnomalyDetail] = []
self._check_score_regression(result, baseline, anomalies)
self._check_lcp_regression(result, baseline, anomalies)
self._check_cls_regression(result, baseline, anomalies)
self._check_tbt_regression(result, baseline, anomalies)
return AnomalyResult(
url=result.url,
strategy=result.strategy,
is_anomaly=len(anomalies) > 0,
anomalies=anomalies,
baseline=baseline,
)
def store_result(
self,
result: PageSpeedResult,
anomaly_result: AnomalyResult,
deployment_type: Optional[str] = None,
commit_sha: Optional[str] = None,
deployment_id: Optional[str] = None,
) -> Optional[str]:
"""
Store PageSpeed result in Supabase.
Args:
result: PageSpeedResult from analysis
anomaly_result: AnomalyResult from anomaly detection
deployment_type: 'frontend', 'api', or 'manual'
commit_sha: Git commit SHA
deployment_id: Unique ID for this deployment run
Returns:
UUID of inserted record, or None on error
"""
if not result.is_success:
logger.warning(f"Skipping storage for failed result: {result.url}")
return None
data = {
'url': result.url,
'strategy': result.strategy,
'performance_score': result.scores.performance,
'accessibility_score': result.scores.accessibility,
'best_practices_score': result.scores.best_practices,
'seo_score': result.scores.seo,
'lcp_ms': result.vitals.lcp_ms,
'fid_ms': result.vitals.fid_ms,
'inp_ms': result.vitals.inp_ms,
'cls': result.vitals.cls,
'fcp_ms': result.vitals.fcp_ms,
'ttfb_ms': result.vitals.ttfb_ms,
'tbt_ms': result.vitals.tbt_ms,
'speed_index_ms': result.vitals.speed_index_ms,
'deployment_type': deployment_type,
'commit_sha': commit_sha,
'deployment_id': deployment_id,
'is_anomaly': anomaly_result.is_anomaly,
'anomaly_details': anomaly_result.to_dict() if anomaly_result.is_anomaly else None,
# Don't store raw_response to save space (can re-fetch if needed)
}
try:
response = self.supabase.table('performance_scores').insert(data).execute()
if response.data and len(response.data) > 0:
return response.data[0].get('id')
except Exception as e:
logger.error(f"Failed to store result for {result.url}: {e}")
return None
def update_github_issue(
self,
record_id: str,
issue_number: int,
issue_url: str
):
"""Update a performance_scores record with GitHub issue info."""
try:
self.supabase.table('performance_scores').update({
'github_issue_number': issue_number,
'github_issue_url': issue_url,
}).eq('id', record_id).execute()
except Exception as e:
logger.error(f"Failed to update GitHub issue for {record_id}: {e}")