"""
DevTools data-driven anomaly feedback system for Percepta MCP.
"""
import asyncio
import json
import logging
from datetime import datetime, UTC
from typing import Any, Dict, List, Optional, Union, Set
from pathlib import Path
from ..config import Settings
from .browser_tools import BrowserAutomation
logger = logging.getLogger(__name__)
class DevToolsAnalyzer:
"""Analyzes DevTools data to detect anomalies and provide feedback."""
def __init__(self, settings: Settings, browser_automation: Optional[BrowserAutomation] = None):
self.settings = settings
self.browser_automation = browser_automation or BrowserAutomation(settings)
self.console_logs: List[Dict[str, Any]] = []
self.network_logs: List[Dict[str, Any]] = []
self.performance_metrics: Dict[str, Any] = {}
self.anomalies: List[Dict[str, Any]] = []
async def start_monitoring(self, url: str) -> Dict[str, Any]:
"""
Start monitoring DevTools data for the given URL.
Args:
url: The URL to monitor
Returns:
Dictionary containing monitoring status
"""
try:
logger.info(f"Starting DevTools monitoring for {url}")
# Navigate to the page
result = await self.browser_automation.navigate(url)
if not result.get("success"):
return {
"success": False,
"error": f"Failed to navigate to {url}: {result.get('error')}",
"monitoring_active": False
}
# Enable console and network monitoring via script injection
monitoring_script = """
window.devToolsMonitor = {
consoleLogs: [],
networkRequests: [],
performanceMetrics: {},
init: function() {
// Capture console logs
const originalLog = console.log;
const originalError = console.error;
const originalWarn = console.warn;
console.log = function(...args) {
window.devToolsMonitor.consoleLogs.push({
type: 'log',
message: args.join(' '),
timestamp: Date.now(),
url: window.location.href
});
originalLog.apply(console, args);
};
console.error = function(...args) {
window.devToolsMonitor.consoleLogs.push({
type: 'error',
message: args.join(' '),
timestamp: Date.now(),
url: window.location.href
});
originalError.apply(console, args);
};
console.warn = function(...args) {
window.devToolsMonitor.consoleLogs.push({
type: 'warn',
message: args.join(' '),
timestamp: Date.now(),
url: window.location.href
});
originalWarn.apply(console, args);
};
// Monitor network requests
const originalFetch = window.fetch;
window.fetch = function(...args) {
const startTime = Date.now();
const url = args[0];
return originalFetch.apply(this, args).then(response => {
const endTime = Date.now();
window.devToolsMonitor.networkRequests.push({
url: url,
method: args[1]?.method || 'GET',
status: response.status,
statusText: response.statusText,
duration: endTime - startTime,
timestamp: startTime
});
return response;
}).catch(error => {
const endTime = Date.now();
window.devToolsMonitor.networkRequests.push({
url: url,
method: args[1]?.method || 'GET',
status: 0,
statusText: 'Network Error',
error: error.message,
duration: endTime - startTime,
timestamp: startTime
});
throw error;
});
};
// Capture performance metrics
if (window.performance) {
setTimeout(() => {
const navigation = performance.getEntriesByType('navigation')[0];
const resources = performance.getEntriesByType('resource');
window.devToolsMonitor.performanceMetrics = {
domContentLoaded: navigation.domContentLoadedEventEnd - navigation.domContentLoadedEventStart,
loadComplete: navigation.loadEventEnd - navigation.loadEventStart,
totalResources: resources.length,
slowResources: resources.filter(r => r.duration > 1000).length,
failedResources: resources.filter(r => r.transferSize === 0).length,
timestamp: Date.now()
};
}, 2000);
}
// Listen for unhandled errors
window.addEventListener('error', function(event) {
window.devToolsMonitor.consoleLogs.push({
type: 'error',
message: `Unhandled error: ${event.message}`,
filename: event.filename,
lineno: event.lineno,
colno: event.colno,
timestamp: Date.now(),
url: window.location.href
});
});
// Listen for unhandled promise rejections
window.addEventListener('unhandledrejection', function(event) {
window.devToolsMonitor.consoleLogs.push({
type: 'error',
message: `Unhandled promise rejection: ${event.reason}`,
timestamp: Date.now(),
url: window.location.href
});
});
}
};
window.devToolsMonitor.init();
"""
# Inject the monitoring script
await self.browser_automation.evaluate_script(monitoring_script)
return {
"success": True,
"monitoring_active": True,
"url": url,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error starting DevTools monitoring: {e}")
return {
"success": False,
"error": f"Failed to start monitoring: {str(e)}",
"monitoring_active": False
}
async def collect_devtools_data(self) -> Dict[str, Any]:
"""
Collect current DevTools data from the monitored page.
Returns:
Dictionary containing collected DevTools data
"""
try:
logger.info("Collecting DevTools data")
# Extract data from the injected monitoring script
data_script = """
return window.devToolsMonitor ? {
consoleLogs: window.devToolsMonitor.consoleLogs,
networkRequests: window.devToolsMonitor.networkRequests,
performanceMetrics: window.devToolsMonitor.performanceMetrics
} : null;
"""
result = await self.browser_automation.evaluate_script(data_script)
if not result.get("success") or not result.get("data"):
return {
"success": False,
"error": "No DevTools data available or monitoring not active",
"data": None
}
devtools_data = result["data"]
# Store the data internally
self.console_logs = devtools_data.get("consoleLogs", [])
self.network_logs = devtools_data.get("networkRequests", [])
self.performance_metrics = devtools_data.get("performanceMetrics", {})
return {
"success": True,
"data": {
"console_logs": self.console_logs,
"network_logs": self.network_logs,
"performance_metrics": self.performance_metrics,
"total_console_entries": len(self.console_logs),
"total_network_requests": len(self.network_logs),
"timestamp": datetime.now(UTC).isoformat()
}
}
except Exception as e:
logger.error(f"Error collecting DevTools data: {e}")
return {
"success": False,
"error": f"Failed to collect DevTools data: {str(e)}",
"data": None
}
async def analyze_anomalies(self) -> Dict[str, Any]:
"""
Analyze collected DevTools data to detect anomalies.
Returns:
Dictionary containing detected anomalies and analysis
"""
try:
logger.info("Analyzing DevTools data for anomalies")
anomalies = []
# Analyze console logs for errors and warnings
error_logs = [log for log in self.console_logs if log.get("type") == "error"]
warn_logs = [log for log in self.console_logs if log.get("type") == "warn"]
if error_logs:
anomalies.append({
"type": "console_errors",
"severity": "high",
"count": len(error_logs),
"description": f"Found {len(error_logs)} console errors",
"details": error_logs[:5], # Show first 5 errors
"timestamp": datetime.now(UTC).isoformat()
})
if warn_logs:
anomalies.append({
"type": "console_warnings",
"severity": "medium",
"count": len(warn_logs),
"description": f"Found {len(warn_logs)} console warnings",
"details": warn_logs[:5], # Show first 5 warnings
"timestamp": datetime.now(UTC).isoformat()
})
# Analyze network requests for failures
failed_requests = [req for req in self.network_logs if req.get("status", 200) >= 400]
slow_requests = [req for req in self.network_logs if req.get("duration", 0) > 5000]
if failed_requests:
anomalies.append({
"type": "network_failures",
"severity": "high",
"count": len(failed_requests),
"description": f"Found {len(failed_requests)} failed network requests",
"details": failed_requests[:5],
"timestamp": datetime.now(UTC).isoformat()
})
if slow_requests:
anomalies.append({
"type": "slow_requests",
"severity": "medium",
"count": len(slow_requests),
"description": f"Found {len(slow_requests)} slow network requests (>5s)",
"details": slow_requests[:5],
"timestamp": datetime.now(UTC).isoformat()
})
# Analyze performance metrics
if self.performance_metrics:
dom_load_time = self.performance_metrics.get("domContentLoaded", 0)
load_complete_time = self.performance_metrics.get("loadComplete", 0)
failed_resources = self.performance_metrics.get("failedResources", 0)
if dom_load_time > 3000: # > 3 seconds
anomalies.append({
"type": "slow_dom_load",
"severity": "medium",
"value": dom_load_time,
"description": f"DOM content loaded slowly: {dom_load_time}ms",
"timestamp": datetime.now(UTC).isoformat()
})
if load_complete_time > 5000: # > 5 seconds
anomalies.append({
"type": "slow_page_load",
"severity": "medium",
"value": load_complete_time,
"description": f"Page load completed slowly: {load_complete_time}ms",
"timestamp": datetime.now(UTC).isoformat()
})
if failed_resources > 0:
anomalies.append({
"type": "failed_resources",
"severity": "high",
"count": failed_resources,
"description": f"Found {failed_resources} failed resource loads",
"timestamp": datetime.now(UTC).isoformat()
})
self.anomalies = anomalies
return {
"success": True,
"anomalies": anomalies,
"summary": {
"total_anomalies": len(anomalies),
"high_severity": len([a for a in anomalies if a.get("severity") == "high"]),
"medium_severity": len([a for a in anomalies if a.get("severity") == "medium"]),
"low_severity": len([a for a in anomalies if a.get("severity") == "low"]),
"analysis_timestamp": datetime.now(UTC).isoformat()
}
}
except Exception as e:
logger.error(f"Error analyzing anomalies: {e}")
return {
"success": False,
"error": f"Failed to analyze anomalies: {str(e)}",
"anomalies": [],
"summary": {
"total_anomalies": 0,
"high_severity": 0,
"medium_severity": 0,
"low_severity": 0,
"analysis_timestamp": datetime.now(UTC).isoformat()
}
}
async def generate_feedback_report(self, include_recommendations: bool = True) -> Dict[str, Any]:
"""
Generate a comprehensive feedback report based on DevTools analysis.
Args:
include_recommendations: Whether to include recommendations in the report
Returns:
Dictionary containing the feedback report
"""
try:
logger.info("Generating DevTools feedback report")
# Collect current data
data_result = await self.collect_devtools_data()
if not data_result.get("success"):
return {
"success": False,
"error": "Failed to collect DevTools data for report",
"report": None
}
# Analyze anomalies
anomaly_result = await self.analyze_anomalies()
if not anomaly_result.get("success"):
return {
"success": False,
"error": "Failed to analyze anomalies for report",
"report": None
}
# Generate recommendations based on anomalies
recommendations = []
if include_recommendations:
for anomaly in anomaly_result.get("anomalies", []):
if anomaly.get("type") == "console_errors":
recommendations.append({
"category": "JavaScript Errors",
"priority": "high",
"suggestion": "Fix JavaScript errors to improve page stability and user experience",
"action": "Review and fix the console errors listed in the details"
})
elif anomaly.get("type") == "network_failures":
recommendations.append({
"category": "Network Issues",
"priority": "high",
"suggestion": "Investigate and fix failed network requests",
"action": "Check server endpoints and network connectivity"
})
elif anomaly.get("type") == "slow_requests":
recommendations.append({
"category": "Performance",
"priority": "medium",
"suggestion": "Optimize slow network requests to improve page load times",
"action": "Implement caching, optimize API responses, or use CDN"
})
elif anomaly.get("type") == "slow_page_load":
recommendations.append({
"category": "Performance",
"priority": "medium",
"suggestion": "Optimize page load performance",
"action": "Minimize resources, optimize images, and reduce JavaScript bundle size"
})
elif anomaly.get("type") == "slow_dom_load":
recommendations.append({
"category": "Performance",
"priority": "medium",
"suggestion": "Optimize DOM content loading time",
"action": "Reduce JavaScript blocking, optimize CSS delivery, and minimize DOM size"
})
# Add default recommendations if anomalies exist but no specific ones generated
if len(anomaly_result.get("anomalies", [])) > 0 and len(recommendations) == 0:
recommendations.append({
"category": "General",
"priority": "medium",
"suggestion": "Review detected anomalies and consider optimization opportunities",
"action": "Analyze the anomaly details and implement appropriate fixes"
})
report = {
"summary": {
"url": self.browser_automation.page.url if self.browser_automation.page else "Unknown",
"scan_timestamp": datetime.now(UTC).isoformat(),
"total_console_logs": len(self.console_logs),
"total_network_requests": len(self.network_logs),
"total_anomalies": len(anomaly_result.get("anomalies", [])),
"overall_health": "good" if len(anomaly_result.get("anomalies", [])) == 0 else "poor" if len([a for a in anomaly_result.get("anomalies", []) if a.get("severity") == "high"]) > 0 else "fair"
},
"anomalies": anomaly_result.get("anomalies", []),
"recommendations": recommendations,
"raw_data": {
"console_logs": self.console_logs,
"network_logs": self.network_logs,
"performance_metrics": self.performance_metrics
}
}
return {
"success": True,
"report": report,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error generating feedback report: {e}")
return {
"success": False,
"error": f"Failed to generate feedback report: {str(e)}",
"report": None
}
async def monitor_and_analyze(self, url: str, duration_seconds: int = 30) -> Dict[str, Any]:
"""
Monitor a page for the specified duration and generate a complete analysis.
Args:
url: The URL to monitor
duration_seconds: How long to monitor (default: 30 seconds)
Returns:
Dictionary containing the complete monitoring and analysis results
"""
try:
logger.info(f"Starting comprehensive monitoring of {url} for {duration_seconds} seconds")
# Start monitoring
monitor_result = await self.start_monitoring(url)
if not monitor_result.get("success"):
return monitor_result
# Wait for the specified duration to collect data
await asyncio.sleep(duration_seconds)
# Generate the final report
report_result = await self.generate_feedback_report()
return {
"success": True,
"monitoring_duration": duration_seconds,
"url": url,
"report": report_result.get("report"),
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error during monitoring and analysis: {e}")
return {
"success": False,
"error": f"Failed to complete monitoring and analysis: {str(e)}",
"report": None
}