"""Skill runner for direct execution via browser fetch().
Executes skills directly by running fetch() from within the browser context.
This leverages the browser's cookie jar and auth state for:
- Automatic session handling
- No CORS issues (request from page context)
- Preserved authentication
Much faster than agent navigation (~1-3s vs ~60-120s).
Key implementation detail: Uses session-scoped CDP commands with `session_id`
to bypass browser-use's watchdog system and avoid hangs. The Page and Runtime
domains must be enabled on the CDP session before use.
"""
import asyncio
import ipaddress
import json
import logging
import socket
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
import jmespath
from jmespath.exceptions import JMESPathError
from .models import AuthRecovery, Skill, SkillRequest
if TYPE_CHECKING:
from browser_use.browser.session import BrowserSession, CDPSession
logger = logging.getLogger(__name__)
# Blocked hostnames (case-insensitive) - comprehensive localhost variants
_BLOCKED_HOSTS = frozenset(
{
"localhost",
"127.0.0.1",
"::1",
"0.0.0.0",
"[::1]",
"[::]",
"[0:0:0:0:0:0:0:0]",
"[0:0:0:0:0:0:0:1]",
}
)
def _normalize_ip(host: str) -> ipaddress.IPv4Address | ipaddress.IPv6Address | None:
"""Parse IP from various formats (decimal, octal, hex, bracketed IPv6).
Handles:
- Standard IPv4: 127.0.0.1
- Decimal IPv4: 2130706433 (= 127.0.0.1)
- IPv6: ::1, fe80::1
- Bracketed IPv6: [::1], [fe80::1]
"""
clean = host.strip("[]")
# Handle decimal notation: 2130706433 -> 127.0.0.1
if clean.isdigit():
try:
return ipaddress.IPv4Address(int(clean))
except ValueError:
pass
# Handle standard notation (IPv4 or IPv6)
try:
return ipaddress.ip_address(clean)
except ValueError:
return None
def _is_ip_blocked(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
"""Check if IP is private, loopback, link-local, or reserved."""
return ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast
async def validate_url_safe(url: str) -> None:
"""Validate URL is safe from SSRF attacks.
Raises ValueError if URL is unsafe. Checks:
- Scheme is http/https
- No credentials in URL (user:pass@host bypass)
- Hostname exists and is not blocked
- IP addresses are not private/reserved
- DNS resolution returns only public IPs (DNS rebinding protection)
"""
parsed = urlparse(url)
# Scheme check
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Scheme '{parsed.scheme}' not allowed, use http/https")
# Reject URLs with credentials (user:pass@host bypass)
if parsed.username or parsed.password:
raise ValueError("URLs with credentials not allowed")
hostname = parsed.hostname
if not hostname:
raise ValueError("URL must have a hostname")
# Strip IPv6 zone ID (%eth0) - these can bypass some checks
if "%" in hostname:
hostname = hostname.split("%")[0]
# Check blocked hostnames
if hostname.lower() in _BLOCKED_HOSTS:
raise ValueError(f"Hostname '{hostname}' is blocked")
# Check if it's an IP address (various formats)
ip = _normalize_ip(hostname)
if ip is not None:
if _is_ip_blocked(ip):
raise ValueError(f"IP '{ip}' is blocked (private/reserved)")
return # Valid public IP
# DNS resolution - run in thread to avoid blocking event loop
try:
loop = asyncio.get_running_loop()
addr_info = await loop.run_in_executor(None, socket.getaddrinfo, hostname, None)
except socket.gaierror as e:
raise ValueError(f"Cannot resolve hostname '{hostname}': {e}") from e
# Check ALL resolved IPs (DNS rebinding protection)
for _family, _type, _proto, _canonname, sockaddr in addr_info:
resolved_ip = ipaddress.ip_address(sockaddr[0])
if _is_ip_blocked(resolved_ip):
raise ValueError(f"Hostname '{hostname}' resolves to blocked IP '{resolved_ip}'")
def validate_domain_allowed(url: str, allowed_domains: list[str]) -> None:
"""Validate URL domain is in allowlist.
Empty allowlist means all domains allowed (for backwards compatibility).
Supports subdomain matching: api.example.com matches allowlist entry example.com.
"""
if not allowed_domains:
return # No restrictions
hostname = urlparse(url).hostname
if not hostname:
raise ValueError("URL must have a hostname")
hostname_lower = hostname.lower()
for allowed in allowed_domains:
allowed_lower = allowed.lower()
# Exact match or subdomain match
if hostname_lower == allowed_lower or hostname_lower.endswith(f".{allowed_lower}"):
return
raise ValueError(f"Domain '{hostname}' not in allowlist: {allowed_domains}")
def build_url(template: str, params: dict[str, Any]) -> str:
"""Build URL from template with proper encoding.
Handles:
- Path parameters with URL encoding: /users/{id} -> /users/a%20b
- Query parameters with proper escaping
"""
parsed = urlparse(template)
# Substitute path parameters with URL encoding
path = parsed.path
for key, value in params.items():
placeholder = f"{{{key}}}"
if placeholder in path:
path = path.replace(placeholder, quote(str(value), safe=""))
# Substitute query parameters
query_dict = parse_qs(parsed.query, keep_blank_values=True)
new_query_items: list[tuple[str, str]] = []
for key, values in query_dict.items():
for val in values:
new_val = val
for pk, pv in params.items():
placeholder = f"{{{pk}}}"
if placeholder in new_val:
new_val = new_val.replace(placeholder, str(pv))
new_query_items.append((key, new_val))
new_query = urlencode(new_query_items, safe="")
return urlunparse((parsed.scheme, parsed.netloc, path, parsed.params, new_query, parsed.fragment))
def extract_data(data: Any, expression: str | None) -> Any:
"""Extract data using JMESPath expression.
Supports:
- Simple paths: data.items
- Filters: items[?active==`true`].name
- Functions: length(items), sort_by(@, &name)
"""
if not expression:
return data
try:
return jmespath.search(expression, data)
except JMESPathError as e:
raise ValueError(f"JMESPath extraction failed: {e}") from e
# Legacy function for backwards compatibility - will be removed
def is_private_url(url: str) -> bool:
"""Check if URL resolves to private IP. DEPRECATED: Use validate_url_safe() instead."""
try:
parsed = urlparse(url)
hostname = parsed.hostname or ""
ip = _normalize_ip(hostname)
if ip is not None:
return _is_ip_blocked(ip)
# Resolve hostname (blocking - legacy behavior)
resolved = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(resolved)
return _is_ip_blocked(ip)
except Exception:
return False
@dataclass
class SkillRunResult:
"""Result of skill execution."""
success: bool
data: Any = None # Parsed response data
raw_response: str | None = None # Raw response body
status_code: int = 0
error: str | None = None
auth_recovery_triggered: bool = False
class SkillRunner:
"""Executes skills directly via browser fetch().
Usage:
runner = SkillRunner()
# Run with existing browser session
result = await runner.run(skill, params, browser_session)
# Or let runner manage browser
result = await runner.run_standalone(skill, params, browser_profile)
"""
def __init__(self, timeout: float = 30.0):
"""Initialize runner.
Args:
timeout: Request timeout in seconds
"""
self.timeout = timeout
async def run(
self,
skill: Skill,
params: dict[str, Any],
browser_session: "BrowserSession",
) -> SkillRunResult:
"""Execute a skill using an existing browser session.
Args:
skill: Skill with request configuration
params: Parameters to substitute in request
browser_session: Active browser session
Returns:
SkillRunResult with parsed data or error
"""
if not skill.request:
return SkillRunResult(
success=False,
error="Skill does not support direct execution (no request config)",
)
request = skill.request
auth_recovery = skill.auth_recovery
# Build the fetch URL with proper encoding
url = build_url(request.url, params)
# SSRF protection - comprehensive async check
try:
await validate_url_safe(url)
except ValueError as e:
return SkillRunResult(success=False, error=f"SSRF blocked: {e}")
# Domain allowlist enforcement (if configured)
allowed_domains = getattr(request, "allowed_domains", [])
try:
validate_domain_allowed(url, allowed_domains)
except ValueError as e:
return SkillRunResult(success=False, error=f"Domain not allowed: {e}")
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
logger.info(f"SkillRunner executing: {request.method} {url}")
# Get or create a CDP session with domains enabled
# This is critical: using session_id bypasses browser-use watchdogs
try:
cdp_session = await self._get_cdp_session(browser_session)
except Exception as e:
logger.error(f"Failed to initialize CDP session: {e}")
return SkillRunResult(success=False, error=f"CDP session failed: {e}")
# Navigate to the domain first to establish cookie context
try:
await self._navigate_to_domain(browser_session, cdp_session, base_url)
except Exception as e:
logger.error(f"Failed to navigate to domain {base_url}: {e}")
return SkillRunResult(success=False, error=f"Navigation failed: {e}")
# Execute the fetch
result = await self._execute_fetch(request, params, browser_session, cdp_session)
# Check if auth recovery is needed
if not result.success and auth_recovery and self._should_recover_auth(result, auth_recovery):
logger.info(f"Auth recovery triggered, navigating to: {auth_recovery.recovery_page}")
result.auth_recovery_triggered = True
# Return with auth_recovery_triggered flag - caller should handle recovery
# We don't do recovery here because it requires agent interaction
result.error = f"Auth required - recovery page: {auth_recovery.recovery_page}"
return result
async def _get_cdp_session(self, browser_session: "BrowserSession") -> "CDPSession":
"""Get or create a CDP session with required domains enabled.
Uses session-scoped CDP commands to bypass watchdog interference.
Enables Page and Runtime domains needed for navigation and fetch execution.
Args:
browser_session: Browser session
Returns:
CDPSession with domains enabled
"""
# Get the active target ID (current tab)
cdp_session = await browser_session.get_or_create_cdp_session()
# Enable Page domain (required for navigation)
try:
await browser_session.cdp_client.send.Page.enable(session_id=cdp_session.session_id)
logger.debug(f"Enabled Page domain for session {cdp_session.session_id[-8:]}")
except Exception as e:
# May already be enabled by session manager
logger.debug(f"Page.enable: {e}")
# Enable Runtime domain (required for evaluate)
try:
await browser_session.cdp_client.send.Runtime.enable(session_id=cdp_session.session_id)
logger.debug(f"Enabled Runtime domain for session {cdp_session.session_id[-8:]}")
except Exception as e:
logger.debug(f"Runtime.enable: {e}")
return cdp_session
async def _navigate_to_domain(
self,
browser_session: "BrowserSession",
cdp_session: "CDPSession",
base_url: str,
) -> None:
"""Navigate to the target domain to establish cookie context.
Uses session-scoped CDP Page.navigate to avoid watchdog interference.
Args:
browser_session: Browser session
cdp_session: CDP session with Page domain enabled
base_url: Base URL of the target domain
"""
# Get current URL
try:
current_url = await self._get_current_url(browser_session, cdp_session)
current_parsed = urlparse(current_url) if current_url else None
# Only navigate if we're not already on the same domain
target_parsed = urlparse(base_url)
if current_parsed and current_parsed.netloc == target_parsed.netloc:
logger.debug(f"Already on domain {target_parsed.netloc}, skipping navigation")
return
except Exception as e:
logger.debug(f"Could not get current URL: {e}, continuing with navigation")
# Navigate using CDP Page.navigate with session_id (bypasses watchdogs)
logger.debug(f"Navigating to domain: {base_url}")
nav_result = await browser_session.cdp_client.send.Page.navigate(
params={"url": base_url, "transitionType": "address_bar"},
session_id=cdp_session.session_id,
)
# Check for navigation errors
if nav_result.get("errorText"):
raise RuntimeError(f"Navigation failed: {nav_result['errorText']}")
# Wait for page to stabilize using lifecycle event or simple delay
# A brief wait is needed for cookies to be established
await asyncio.sleep(1.0)
async def _get_current_url(
self,
browser_session: "BrowserSession",
cdp_session: "CDPSession",
) -> str | None:
"""Get the current page URL.
Args:
browser_session: Browser session
cdp_session: CDP session with Page domain enabled
Returns:
Current URL or None
"""
try:
# Get the current frame tree to find the URL (using session_id)
result = await browser_session.cdp_client.send.Page.getFrameTree(session_id=cdp_session.session_id)
frame = result.get("frameTree", {}).get("frame", {})
return frame.get("url")
except Exception as e:
logger.debug(f"Could not get frame tree: {e}")
return None
async def _execute_fetch(
self,
request: SkillRequest,
params: dict[str, Any],
browser_session: "BrowserSession",
cdp_session: "CDPSession",
) -> SkillRunResult:
"""Execute fetch() via CDP Runtime.evaluate.
Uses session-scoped Runtime.evaluate to execute fetch in browser context.
Args:
request: Skill request configuration
params: Parameters to substitute
browser_session: Browser session
cdp_session: CDP session with Runtime domain enabled
Returns:
SkillRunResult with response data
"""
url = request.build_url(params)
# CRITICAL: Re-validate URL immediately before fetch to prevent DNS rebinding (TOCTOU)
# DNS could have been rebound from public to private IP since initial validation
try:
await validate_url_safe(url)
except ValueError as e:
logger.error(f"SSRF protection: URL validation failed at fetch time: {e}")
return SkillRunResult(success=False, error=f"SSRF blocked at fetch time: {e}")
options = request.to_fetch_options(params)
# Build JavaScript fetch code
js_code = self._build_fetch_js(url, options, request.response_type)
logger.debug(f"Executing fetch: {request.method} {url}")
try:
# Execute in browser context using session_id
result = await browser_session.cdp_client.send.Runtime.evaluate(
params={
"expression": js_code,
"awaitPromise": True,
"returnByValue": True,
"timeout": int(self.timeout * 1000),
},
session_id=cdp_session.session_id,
)
# Parse the result
if result.get("exceptionDetails"):
error = result["exceptionDetails"].get("text", "Unknown error")
logger.error(f"Fetch failed with exception: {error}")
return SkillRunResult(success=False, error=error)
value = result.get("result", {}).get("value", {})
if not value.get("ok"):
status = value.get("status", 0)
error_text = value.get("error") or value.get("body", "Request failed")
logger.warning(f"Fetch returned status {status}: {error_text}")
return SkillRunResult(
success=False,
status_code=status,
raw_response=value.get("body"),
error=f"HTTP {status}: {error_text[:100]}",
)
# Success - extract data
raw_body = value.get("body", "")
status_code = value.get("status", 200)
# Parse response based on type
parsed_data = self._parse_response(raw_body, request)
logger.info(f"Fetch succeeded: {status_code}, data extracted")
return SkillRunResult(
success=True,
data=parsed_data,
raw_response=raw_body,
status_code=status_code,
)
except Exception as e:
logger.error(f"Fetch execution failed: {e}")
return SkillRunResult(success=False, error=str(e))
def _build_fetch_js(self, url: str, options: dict[str, Any], response_type: str) -> str:
"""Build JavaScript code for fetch execution.
Args:
url: Request URL
options: Fetch options
response_type: Expected response type (json, html, text)
Returns:
JavaScript code string
"""
options_json = json.dumps(options)
# Build response handling based on type
if response_type == "json":
response_handler = "response.json()"
else:
response_handler = "response.text()"
return f"""
(async () => {{
let response;
try {{
response = await fetch({json.dumps(url)}, {options_json});
}} catch (error) {{
return {{
ok: false,
status: 0,
error: 'Fetch failed: ' + (error.message || String(error)),
}};
}}
// Capture status before attempting body parse (may fail for non-JSON)
const status = response.status;
const ok = response.ok;
try {{
const body = await {response_handler};
return {{
ok: ok,
status: status,
body: typeof body === 'string' ? body : JSON.stringify(body),
}};
}} catch (parseError) {{
// Body parsing failed - try to get raw text for error context
let rawBody = '';
try {{
rawBody = await response.clone().text();
}} catch (e) {{}}
return {{
ok: ok,
status: status,
body: rawBody,
error: 'Body parse failed: ' + (parseError.message || String(parseError)),
}};
}}
}})()
"""
def _parse_response(self, raw_body: str, request: SkillRequest) -> Any:
"""Parse response according to skill configuration.
Args:
raw_body: Raw response body
request: Skill request with parsing config
Returns:
Parsed/extracted data
"""
if request.response_type == "json":
try:
data = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
# Extract using JMESPath if specified
if request.extract_path:
try:
return extract_data(data, request.extract_path)
except ValueError as e:
logger.warning(f"JMESPath extraction failed: {e}")
return data
return data
except json.JSONDecodeError as e:
logger.warning(f"JSON parse failed: {e}")
return raw_body
elif request.response_type == "html" and request.html_selectors:
# Parse HTML using BeautifulSoup
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(raw_body, "html.parser")
extracted: dict[str, list[str]] = {}
for name, selector in request.html_selectors.items():
elements = soup.select(selector)
extracted[name] = [el.get_text(strip=True) for el in elements if el.get_text(strip=True)]
logger.debug(f"HTML extraction: {len(extracted)} fields extracted")
return extracted
except ImportError:
logger.warning("BeautifulSoup not installed, returning raw HTML")
return raw_body
except Exception as e:
logger.warning(f"HTML parsing failed: {e}")
return raw_body
return raw_body
def _should_recover_auth(self, result: SkillRunResult, auth_recovery: AuthRecovery) -> bool:
"""Check if auth recovery should be triggered.
Args:
result: Failed result
auth_recovery: Recovery configuration
Returns:
True if recovery should be triggered
"""
# Check status code
if result.status_code in auth_recovery.trigger_on_status:
return True
# Check response body for auth error text
if auth_recovery.trigger_on_body and result.raw_response:
if auth_recovery.trigger_on_body.lower() in result.raw_response.lower():
return True
return False