Skip to main content
Glama
server.py116 kB
"""Playwright MCP Server - Main server implementation.""" import asyncio import os import secrets import base64 import shutil import hashlib import json import logging import mimetypes import sys import time import uuid from collections import deque from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass from pathlib import Path from typing import Any, Deque, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse, parse_qs from mcp.server.fastmcp import Context, FastMCP from playwright.async_api import ( Browser, BrowserContext, Page, Playwright, async_playwright, Request, Response, Route, ) from pydantic import BaseModel, Field logger = logging.getLogger(__name__) MAX_STORED_RESPONSE_HANDLES = 500 @dataclass class BrowserState: """Browser state container.""" playwright: Playwright browser: Browser context: BrowserContext page: Page # Current active page # Page management pages: Dict[str, Page] = None # Maps page ID to Page object current_page_id: str = None # ID of the current active page # Network monitoring state captured_requests: List[Dict[str, Any]] = None captured_responses: List[Dict[str, Any]] = None response_handles: Dict[str, Response] = None response_handle_order: Deque[str] = None def __post_init__(self): if self.pages is None: self.pages = {} if self.captured_requests is None: self.captured_requests = [] if self.captured_responses is None: self.captured_responses = [] if self.response_handles is None: self.response_handles = {} if self.response_handle_order is None: self.response_handle_order = deque() def get_current_page(self) -> Page: """Get the current active page.""" if self.current_page_id and self.current_page_id in self.pages: return self.pages[self.current_page_id] return self.page def set_current_page(self, page_id: str): """Set the current active page by ID.""" if page_id in self.pages: self.current_page_id = page_id self.page = self.pages[page_id] class NavigationResult(BaseModel): """Navigation operation result.""" success: bool url: str error: Optional[str] = None class CurrentUrlResult(BaseModel): """Current URL information result.""" success: bool url: Optional[str] = None parsed_url: Optional[Dict[str, Any]] = None query_params: Optional[Dict[str, List[str]]] = None error: Optional[str] = None class ElementQueryResult(BaseModel): """Element query result.""" found: bool count: int elements: List[Dict[str, Any]] = Field(default_factory=list) truncated: bool = False returned_count: Optional[int] = None error: Optional[str] = None class ElementMetaResult(BaseModel): """Lightweight element metadata result.""" found: bool count: int elements: List[Dict[str, Any]] = Field(default_factory=list) truncated: bool = False returned_count: Optional[int] = None error: Optional[str] = None class ScreenshotResult(BaseModel): """Screenshot result.""" success: bool data: Optional[str] = None # base64 encoded format: str = "png" error: Optional[str] = None artifact_path: Optional[str] = None byte_size: Optional[int] = None sha256: Optional[str] = None preview_base64: Optional[str] = None dimensions: Optional[Dict[str, Union[int, float]]] = None inline: bool = False class PDFResult(BaseModel): """PDF generation result.""" success: bool data: Optional[str] = None # base64 encoded error: Optional[str] = None artifact_path: Optional[str] = None byte_size: Optional[int] = None sha256: Optional[str] = None preview_base64: Optional[str] = None inline: bool = False class ScriptResult(BaseModel): """JavaScript execution result.""" success: bool result: Optional[Any] = None error: Optional[str] = None truncated: Optional[bool] = None preview: Optional[str] = None overflow_path: Optional[str] = None overflow_characters: Optional[int] = None class NetworkRequestResult(BaseModel): """Network request information result.""" url: str method: str headers: Dict[str, str] resource_type: str post_data: Optional[str] = None timestamp: Optional[float] = None class NetworkResponseResult(BaseModel): """Network response information result.""" url: str status: int status_text: str headers: Dict[str, str] body: Optional[str] = None timestamp: Optional[float] = None class CookieResult(BaseModel): """Cookie operation result.""" success: bool cookies: Optional[List[Dict[str, Any]]] = None error: Optional[str] = None class StorageResult(BaseModel): """Storage operation result.""" success: bool data: Optional[Dict[str, Any]] = None error: Optional[str] = None class PageInfo(BaseModel): """Information about a browser page/tab.""" page_id: str url: str title: str is_current: bool class PageListResult(BaseModel): """Result of listing all open pages.""" success: bool pages: List[PageInfo] = Field(default_factory=list) current_page_id: Optional[str] = None error: Optional[str] = None class PageSwitchResult(BaseModel): """Result of switching between pages.""" success: bool page_id: Optional[str] = None url: Optional[str] = None error: Optional[str] = None class NewPageResult(BaseModel): """Result of waiting for a new page to open.""" success: bool page_id: Optional[str] = None url: Optional[str] = None opener_page_id: Optional[str] = None error: Optional[str] = None class Config: """Server configuration.""" def __init__( self, headless: bool = True, browser_type: str = "chromium", timeout: int = 30000, viewport_width: int = 1920, viewport_height: int = 1080, channel: Optional[str] = None, user_data_dir: Optional[str] = None, max_elements_returned: int = 20, max_element_text_length: int = 2000, max_accessibility_nodes: int = 500, max_response_characters: int = 4000, preview_characters: int = 400, artifact_directory: Optional[str] = None, artifact_max_age_seconds: int = 7200, artifact_max_files: int = 200, artifact_chunk_size: int = 4096, ): self.headless = headless self.browser_type = browser_type self.timeout = timeout self.viewport_width = viewport_width self.viewport_height = viewport_height self.channel = channel self.user_data_dir = user_data_dir self.max_elements_returned = max_elements_returned self.max_element_text_length = max_element_text_length self.max_accessibility_nodes = max_accessibility_nodes self.max_response_characters = max_response_characters self.preview_characters = preview_characters # Base artifact directory; a per-session subdirectory is created at runtime. if artifact_directory: self.artifact_directory = Path(artifact_directory).expanduser() else: self.artifact_directory = (Path.cwd() / "tmp" / "playwright_mcp").expanduser() self.artifact_directory = self.artifact_directory.absolute() self.artifact_max_age_seconds = artifact_max_age_seconds self.artifact_max_files = artifact_max_files self.artifact_chunk_size = max(artifact_chunk_size, 256) # Global configuration config = Config() _restart_lock = asyncio.Lock() async def _shutdown_browser_state(state: Optional[BrowserState]): """Close browser/context and stop Playwright for a given state.""" if not state: return try: if state.context: await state.context.close() elif state.browser: await state.browser.close() except Exception as exc: logger.error("Error closing browser: %s", exc) try: if state.playwright: await state.playwright.stop() except Exception as exc: logger.error("Error stopping playwright: %s", exc) async def _create_browser_state() -> BrowserState: """Launch Playwright and return an initialized BrowserState based on config.""" playwright = await async_playwright().start() use_persistent_context = config.user_data_dir is not None browser = None context = None try: if use_persistent_context: launch_options = { "headless": config.headless, "viewport": { "width": config.viewport_width, "height": config.viewport_height, }, } if config.channel: launch_options["channel"] = config.channel if config.browser_type == "chromium": context = await playwright.chromium.launch_persistent_context( config.user_data_dir, **launch_options ) elif config.browser_type == "firefox": context = await playwright.firefox.launch_persistent_context( config.user_data_dir, **launch_options ) elif config.browser_type == "webkit": context = await playwright.webkit.launch_persistent_context( config.user_data_dir, **launch_options ) else: raise ValueError(f"Unsupported browser type: {config.browser_type}") browser = context.browser page = context.pages[0] if context.pages else await context.new_page() else: launch_options = {"headless": config.headless} if config.channel: launch_options["channel"] = config.channel if config.browser_type == "chromium": browser = await playwright.chromium.launch(**launch_options) elif config.browser_type == "firefox": browser = await playwright.firefox.launch(**launch_options) elif config.browser_type == "webkit": browser = await playwright.webkit.launch(**launch_options) else: raise ValueError(f"Unsupported browser type: {config.browser_type}") context = await browser.new_context( viewport={ "width": config.viewport_width, "height": config.viewport_height, } ) page = await context.new_page() page.set_default_timeout(config.timeout) state = BrowserState( playwright=playwright, browser=browser, context=context, page=page ) page_id = str(uuid.uuid4()) state.pages[page_id] = page state.current_page_id = page_id await _setup_page_tracking(state) await _setup_network_monitoring(state) logger.info("Browser started successfully") return state except Exception: # Best-effort cleanup on failure try: if context: await context.close() elif browser: await browser.close() except Exception: pass try: await playwright.stop() except Exception: pass raise def _compute_session_id() -> str: """Compute a best-effort unique session identifier. We prefer terminal-provided session hints when available (to keep stability per tab), and add process/timestamp/randomness to guarantee uniqueness across concurrent runs. """ hints: list[str] = [] for var in ( "MCP_SESSION_ID", # explicit override if provided "CODEX_SESSION_ID", "ITERM_SESSION_ID", "TERM_SESSION_ID", "TMUX_PANE", "SSH_TTY", ): val = os.getenv(var) if val: hints.append(val) try: if sys.stdin and sys.stdin.isatty(): # TTY device path is fairly unique per terminal tab hints.append(os.ttyname(sys.stdin.fileno())) except Exception: # pragma: no cover - defensive pass # Always salt with pid + coarse time + small random to avoid collisions hints.extend([ f"pid:{os.getpid()}", f"t:{int(time.time()*1000)}", f"r:{secrets.token_hex(4)}", ]) basis = "|".join(hints) return hashlib.sha256(basis.encode("utf-8")).hexdigest()[:16] def _session_artifact_dir(base: Path) -> Path: """Return the per-session artifact directory under a given base.""" sid = _compute_session_id() # Use a stable subfolder to group sessions, keeping base tidy return (base / "sessions" / sid).absolute() def _enforce_session_retention(artifact_base: Path) -> None: """Prune old per-session artifact directories under the base root. Uses the same age limit as file retention (config.artifact_max_age_seconds). If set to 0 or negative, session pruning is disabled. """ max_age = max(config.artifact_max_age_seconds, 0) if max_age <= 0: return sessions_root = (artifact_base / "sessions").expanduser().absolute() try: sessions_root.mkdir(parents=True, exist_ok=True) except Exception as exc: logger.warning("Unable to ensure sessions root %s: %s", sessions_root, exc) return try: entries = [p for p in sessions_root.iterdir() if p.is_dir()] except Exception as exc: logger.warning("Failed to enumerate sessions in %s: %s", sessions_root, exc) return now = time.time() for path in entries: try: age = now - path.stat().st_mtime if age > max_age: shutil.rmtree(path, ignore_errors=True) except Exception as exc: logger.debug("Failed to remove old session %s: %s", path, exc) @asynccontextmanager async def browser_lifespan(server: FastMCP) -> AsyncIterator[BrowserState]: """Manage browser lifecycle.""" logger.info("Starting browser...") state: Optional[BrowserState] = None try: state = await _create_browser_state() yield state finally: logger.info("Shutting down browser...") await _shutdown_browser_state(state) # Create FastMCP server with browser lifespan mcp = FastMCP("Playwright MCP Server", lifespan=browser_lifespan) async def _setup_page_tracking(state: BrowserState): """Set up page tracking for new tabs and popups.""" async def handle_new_page(page: Page): """Handle new page/tab creation.""" try: # Generate unique ID for the new page page_id = str(uuid.uuid4()) # Set default timeout for new page page.set_default_timeout(config.timeout) # Add to pages dictionary state.pages[page_id] = page # Set up network monitoring for the new page await _setup_network_monitoring_for_page(state, page) logger.info(f"New page opened with ID: {page_id}, URL: {page.url}") except Exception as e: logger.error(f"Error handling new page: {e}") # Listen for new pages (tabs, popups, etc.) state.context.on("page", handle_new_page) async def _setup_network_monitoring_for_page(state: BrowserState, page: Page): """Set up network monitoring for a specific page.""" async def handle_request(request: Request): """Capture request details.""" try: request_data = { "url": request.url, "method": request.method, "headers": await request.all_headers(), "resource_type": request.resource_type, "post_data": request.post_data, "timestamp": time.time(), } state.captured_requests.append(request_data) except Exception as e: logger.error(f"Error capturing request: {e}") async def handle_response(response: Response): """Capture response details.""" try: handle_id = uuid.uuid4().hex state.response_handles[handle_id] = response state.response_handle_order.append(handle_id) while len(state.response_handle_order) > MAX_STORED_RESPONSE_HANDLES: old_id = state.response_handle_order.popleft() state.response_handles.pop(old_id, None) response_data = { "url": response.url, "status": response.status, "status_text": response.status_text, "headers": await response.all_headers(), "timestamp": time.time(), "handle_id": handle_id, } state.captured_responses.append(response_data) except Exception as e: logger.error(f"Error capturing response: {e}") # Set up event listeners for this page page.on("request", handle_request) page.on("response", handle_response) async def _setup_network_monitoring(state: BrowserState): """Set up network request and response monitoring for initial page.""" await _setup_network_monitoring_for_page(state, state.page) def get_browser_state(ctx: Context) -> BrowserState: """Get browser state from context.""" return ctx.request_context.lifespan_context def get_current_page(ctx: Context) -> Page: """Get the current active page from context.""" browser_state = get_browser_state(ctx) return browser_state.get_current_page() def _truncate_text(value: Optional[str], max_length: int) -> Tuple[Optional[str], bool]: """Truncate string values according to configured limit.""" if value is None: return None, False if max_length <= 0 or len(value) <= max_length: return value, False return value[:max_length], True def _truncate_attributes( attributes: Dict[str, Any], max_length: int ) -> Tuple[Dict[str, Any], bool]: """Apply text truncation to string attribute values.""" truncated = False limited: Dict[str, Any] = {} for key, attr_value in attributes.items(): if isinstance(attr_value, str): limited_value, was_truncated = _truncate_text(attr_value, max_length) truncated = truncated or was_truncated limited[key] = limited_value else: limited[key] = attr_value return limited, truncated def _count_accessibility_nodes(node: Optional[Dict[str, Any]]) -> int: if not node: return 0 total = 0 stack = [node] while stack: current = stack.pop() total += 1 children = current.get("children") or [] stack.extend(children) return total def _prune_accessibility_snapshot( snapshot: Optional[Dict[str, Any]], max_nodes: int ) -> Tuple[Optional[Dict[str, Any]], bool, int]: if snapshot is None: return None, False, 0 if max_nodes <= 0: return snapshot, False, _count_accessibility_nodes(snapshot) included = 0 truncated = False def prune(node: Dict[str, Any]) -> Optional[Dict[str, Any]]: nonlocal included, truncated if included >= max_nodes: truncated = True return None node_copy = dict(node) included += 1 children = node_copy.get("children") or [] if children: pruned_children = [] for child in children: if included >= max_nodes: truncated = True break pruned_child = prune(child) if pruned_child is not None: pruned_children.append(pruned_child) if pruned_children: node_copy["children"] = pruned_children else: node_copy.pop("children", None) return node_copy pruned_snapshot = prune(snapshot) return pruned_snapshot, truncated, included def _resolve_limit(default_value: int, override: Optional[int]) -> int: if override is None: return default_value return override def _ensure_artifact_dir() -> Path: """Ensure the artifact directory exists and return it.""" directory = config.artifact_directory try: directory.mkdir(parents=True, exist_ok=True) except Exception as exc: logger.error("Unable to create artifact directory %s: %s", directory, exc) return directory def _make_artifact_path(label: str, suffix: str = ".json") -> Path: """Create a unique artifact file path for overflow payloads.""" timestamp = time.strftime("%Y%m%dT%H%M%S", time.gmtime()) safe_label = "".join(ch if ch.isalnum() or ch in {"-", "_"} else "_" for ch in label.lower()) safe_label = safe_label[:64] or "payload" suffix = suffix if suffix.startswith(".") else f".{suffix}" filename = f"{timestamp}_{safe_label}_{uuid.uuid4().hex}{suffix}" return _ensure_artifact_dir() / filename def _resolve_artifact_path(path: str) -> Path: """Resolve a path inside the artifact directory, ensuring it is safe.""" try: resolved_path = Path(path).expanduser().absolute() except Exception as exc: # pragma: no cover - defensive raise ValueError(f"Invalid path: {exc}") from exc artifact_root = _ensure_artifact_dir().absolute() try: resolved_path.relative_to(artifact_root) except ValueError as exc: raise PermissionError("Path is outside of the configured artifact directory") from exc if not resolved_path.exists(): raise FileNotFoundError("Artifact not found") return resolved_path def _is_probably_binary(data: bytes) -> bool: """Heuristic check for binary data.""" if not data: return False text_chars = {7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x7F)) control_bytes = sum(byte not in text_chars for byte in data) return control_bytes / max(len(data), 1) > 0.30 def _guess_suffix_from_mime(content_type: str, default: str) -> str: """Return a file suffix based on MIME type.""" if not content_type: return default guessed = mimetypes.guess_extension(content_type.split(";")[0].strip()) if guessed: return guessed if content_type.startswith("image/"): return ".png" if content_type.startswith("application/pdf"): return ".pdf" if content_type.startswith("text/") or content_type.endswith("json"): return ".txt" return default def _is_text_content(content_type: str) -> bool: """Determine whether a MIME type represents text content.""" if not content_type: return True ctype = content_type.split(";")[0].strip().lower() if ctype.startswith("text/"): return True if ctype.endswith("json") or ctype.endswith("+json"): return True if ctype.endswith("xml") or ctype.endswith("+xml"): return True return ctype in { "application/javascript", "application/x-javascript", "application/vnd.mozilla.xul+xml", } def _serialize_preview(value: Any, max_chars: int) -> str: """Return a string preview of a value limited to ``max_chars``.""" if max_chars <= 0: return "" try: serialized = json.dumps(value, ensure_ascii=False, default=str) except (TypeError, ValueError): serialized = str(value) if len(serialized) <= max_chars: return serialized return serialized[:max_chars] def _estimate_size(value: Any) -> int: """Estimate the serialized size of the value in characters.""" try: return len(json.dumps(value, ensure_ascii=False, default=str)) except (TypeError, ValueError): return len(str(value)) def _empty_like(value: Any) -> Any: """Return an empty placeholder of the same container type.""" if isinstance(value, list): return [] if isinstance(value, dict): return {} if isinstance(value, str): return "" return None def _shrink_value(value: Any, budget: int) -> Tuple[Any, bool]: """Shrink the value so that its serialized representation fits within ``budget`` characters.""" if budget <= 0: return _empty_like(value), True if isinstance(value, str): if len(value) <= budget: return value, False return value[:budget], True if isinstance(value, list): truncated = False reduced: List[Any] = [] for item in value: current_size = _estimate_size(reduced) remaining = max(budget - current_size - 1, 0) if remaining <= 0: truncated = True break shrunk_item, item_truncated = _shrink_value(item, remaining) candidate = reduced + [shrunk_item] if _estimate_size(candidate) > budget: truncated = True break reduced.append(shrunk_item) truncated = truncated or item_truncated return reduced, truncated if isinstance(value, dict): truncated = False reduced: Dict[Any, Any] = {} for key, item in value.items(): current_size = _estimate_size(reduced) key_overhead = _estimate_size({key: None}) - _estimate_size({}) remaining = max(budget - current_size - key_overhead, 0) if remaining <= 0: truncated = True break shrunk_item, item_truncated = _shrink_value(item, remaining) candidate = dict(reduced) candidate[key] = shrunk_item if _estimate_size(candidate) > budget: truncated = True break reduced[key] = shrunk_item truncated = truncated or item_truncated return reduced, truncated return value, False def _apply_response_budget( value: Any, *, budget: int, preview_limit: int, label: str, ) -> Tuple[Any, bool, Optional[str], Optional[str], int]: """Apply response budget constraints to the provided value.""" size = _estimate_size(value) if budget <= 0 or size <= budget: return value, False, None, None, size trimmed_value, trimmed_internally = _shrink_value(value, budget) # Ensure the trimmed value actually respects the budget; fallback to empty if not. if _estimate_size(trimmed_value) > budget: trimmed_value = _empty_like(value) trimmed_internally = True preview = _serialize_preview(trimmed_value, preview_limit) artifact_path = _make_artifact_path(label) serialized_value = None try: serialized_value = json.dumps(value, ensure_ascii=False, default=str, indent=2) except (TypeError, ValueError): serialized_value = str(value) artifact_path = artifact_path.with_suffix(".txt") try: artifact_path.write_text(serialized_value, encoding="utf-8") except Exception as exc: logger.error("Failed to write overflow artifact %s: %s", artifact_path, exc) artifact_path = None if artifact_path: _enforce_artifact_retention() return trimmed_value, True, preview, str(artifact_path) if artifact_path else None, size def _enforce_artifact_retention() -> None: """Apply retention policy to the artifact directory.""" directory = config.artifact_directory max_age = max(config.artifact_max_age_seconds, 0) max_files = config.artifact_max_files try: directory.mkdir(parents=True, exist_ok=True) except Exception as exc: logger.error("Unable to ensure artifact directory %s: %s", directory, exc) return try: files = [path for path in directory.iterdir() if path.is_file()] except Exception as exc: logger.error("Failed to enumerate artifact directory %s: %s", directory, exc) return now = time.time() # Remove files older than the configured age limit if max_age > 0: for path in list(files): try: if now - path.stat().st_mtime > max_age: path.unlink(missing_ok=True) files.remove(path) except Exception as exc: logger.warning("Failed to remove expired artifact %s: %s", path, exc) # Enforce maximum file count by removing oldest files first if max_files and max_files > 0 and len(files) > max_files: try: files.sort(key=lambda p: p.stat().st_mtime) except Exception as exc: logger.error("Failed to sort artifacts for retention: %s", exc) return for path in files[:-max_files]: try: path.unlink(missing_ok=True) except Exception as exc: logger.warning("Failed to remove excess artifact %s: %s", path, exc) META_ATTRIBUTE_KEYS = { "id", "class", "name", "role", "aria-label", "aria-labelledby", "aria-describedby", "data-testid", "data-test", "data-qa", } # Browser lifecycle tools @mcp.tool() async def restart_browser(ctx: Context, headed: bool = True) -> Dict[str, Any]: """Restart the browser in headed or headless mode.""" async with _restart_lock: state = get_browser_state(ctx) previous_headless = config.headless config.headless = not headed try: new_state = await _create_browser_state() except Exception as exc: config.headless = previous_headless logger.error("Failed to start new browser during restart: %s", exc) return {"success": False, "headed": headed, "error": str(exc)} try: await _shutdown_browser_state(state) except Exception as exc: logger.warning("Error shutting down previous browser during restart: %s", exc) state.playwright = new_state.playwright state.browser = new_state.browser state.context = new_state.context state.page = new_state.page state.pages = new_state.pages state.current_page_id = new_state.current_page_id state.captured_requests = new_state.captured_requests state.captured_responses = new_state.captured_responses state.response_handles = new_state.response_handles state.response_handle_order = new_state.response_handle_order return { "success": True, "headed": headed, "current_page_id": state.current_page_id, } # Navigation Tools @mcp.tool() async def navigate(url: str, ctx: Context) -> NavigationResult: """Navigate the browser to a specified URL. This tool loads a new page in the browser. It supports all standard URL formats including HTTP, HTTPS, file:// URLs, and data URLs. Args: url: The URL to navigate to (e.g., "https://example.com", "file:///path/to/file.html") ctx: MCP context containing the browser state Returns: NavigationResult with success status, final URL (after redirects), and any errors """ try: page = get_current_page(ctx) await page.goto(url) current_url = page.url return NavigationResult(success=True, url=current_url) except Exception as e: return NavigationResult(success=False, url="", error=str(e)) @mcp.tool() async def reload(ctx: Context) -> NavigationResult: """Reload the current page in the browser. This tool refreshes the current page, equivalent to pressing F5 or clicking the browser's reload button. All page state will be reset. Args: ctx: MCP context containing the browser state Returns: NavigationResult with success status, current URL, and any errors """ try: page = get_current_page(ctx) await page.reload() current_url = page.url return NavigationResult(success=True, url=current_url) except Exception as e: return NavigationResult(success=False, url="", error=str(e)) @mcp.tool() async def go_back(ctx: Context) -> NavigationResult: """Navigate back to the previous page in browser history. This tool moves back one step in the browser's navigation history, equivalent to clicking the browser's back button. Args: ctx: MCP context containing the browser state Returns: NavigationResult with success status, current URL after navigation, and any errors """ try: page = get_current_page(ctx) await page.go_back() current_url = page.url return NavigationResult(success=True, url=current_url) except Exception as e: return NavigationResult(success=False, url="", error=str(e)) @mcp.tool() async def go_forward(ctx: Context) -> NavigationResult: """Navigate forward to the next page in browser history. This tool moves forward one step in the browser's navigation history, equivalent to clicking the browser's forward button. Only works if there is a forward history (i.e., you've previously gone back). Args: ctx: MCP context containing the browser state Returns: NavigationResult with success status, current URL after navigation, and any errors """ try: page = get_current_page(ctx) await page.go_forward() current_url = page.url return NavigationResult(success=True, url=current_url) except Exception as e: return NavigationResult(success=False, url="", error=str(e)) @mcp.tool() async def get_current_url(ctx: Context) -> CurrentUrlResult: """Get the current page URL with parsed components and query parameters. This tool retrieves the current page URL and provides parsed information including the scheme, domain, path, and query parameters for easy access. Args: ctx: MCP context containing the browser state Returns: CurrentUrlResult with success status, URL, parsed components, query parameters, and any errors """ try: page = get_current_page(ctx) current_url = page.url # Parse the URL into components parsed = urlparse(current_url) parsed_url = { "scheme": parsed.scheme, "netloc": parsed.netloc, "hostname": parsed.hostname, "port": parsed.port, "path": parsed.path, "fragment": parsed.fragment, "query": parsed.query } # Parse query parameters query_params = parse_qs(parsed.query, keep_blank_values=True) return CurrentUrlResult( success=True, url=current_url, parsed_url=parsed_url, query_params=query_params ) except Exception as e: return CurrentUrlResult(success=False, error=str(e)) # Page Management Tools @mcp.tool() async def list_pages(ctx: Context) -> PageListResult: """List all open browser pages/tabs with their information. This tool returns information about all currently open pages including their IDs, URLs, titles, and which one is currently active. Use this to see all tabs/windows that have been opened by user actions or JavaScript. Args: ctx: MCP context containing the browser state Returns: PageListResult with list of all pages and their details """ try: browser_state = get_browser_state(ctx) pages_info = [] for page_id, page in browser_state.pages.items(): try: pages_info.append(PageInfo( page_id=page_id, url=page.url, title=await page.title(), is_current=(page_id == browser_state.current_page_id) )) except Exception as e: logger.error(f"Error getting info for page {page_id}: {e}") return PageListResult( success=True, pages=pages_info, current_page_id=browser_state.current_page_id ) except Exception as e: return PageListResult(success=False, error=str(e)) @mcp.tool() async def switch_page(page_id: str, ctx: Context) -> PageSwitchResult: """Switch to a different browser page/tab by its ID. This tool changes the active page that subsequent commands will operate on. Use list_pages first to get the available page IDs. Args: page_id: The ID of the page to switch to ctx: MCP context containing the browser state Returns: PageSwitchResult with success status and the new active page information """ try: browser_state = get_browser_state(ctx) if page_id not in browser_state.pages: return PageSwitchResult( success=False, error=f"Page with ID {page_id} not found" ) browser_state.set_current_page(page_id) page = browser_state.pages[page_id] return PageSwitchResult( success=True, page_id=page_id, url=page.url ) except Exception as e: return PageSwitchResult(success=False, error=str(e)) @mcp.tool() async def close_page(page_id: str, ctx: Context) -> Dict[str, Any]: """Close a specific browser page/tab by its ID. This tool closes the specified page. If it's the current page, the tool will automatically switch to another available page. Cannot close the last remaining page. Args: page_id: The ID of the page to close ctx: MCP context containing the browser state Returns: Dict with success status, closed page ID, and any error messages """ try: browser_state = get_browser_state(ctx) if page_id not in browser_state.pages: return { "success": False, "error": f"Page with ID {page_id} not found" } if len(browser_state.pages) <= 1: return { "success": False, "error": "Cannot close the last remaining page" } # Close the page page = browser_state.pages[page_id] await page.close() # Remove from tracking del browser_state.pages[page_id] # If this was the current page, switch to another if browser_state.current_page_id == page_id: # Get first available page new_page_id = next(iter(browser_state.pages.keys())) browser_state.set_current_page(new_page_id) return { "success": True, "closed_page_id": page_id, "current_page_id": browser_state.current_page_id } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def wait_for_popup(ctx: Context, timeout: int = 5000) -> NewPageResult: """Wait for a new popup window or tab to be opened. This tool waits for a new page to be created (e.g., from clicking a link with target="_blank" or JavaScript window.open()). It's useful when you know an action will open a new window/tab and you want to capture it. Args: timeout: Maximum time to wait in milliseconds (default: 5000) ctx: MCP context containing the browser state Returns: NewPageResult with information about the new page """ import asyncio import uuid try: browser_state = get_browser_state(ctx) # Store the current page ID as the opener opener_page_id = browser_state.current_page_id # Create a future to wait for the new page new_page_future = asyncio.Future() def handle_page(page: Page): """Handle the new page event.""" if not new_page_future.done(): new_page_future.set_result(page) # Temporarily listen for new pages browser_state.context.once("page", handle_page) try: # Wait for new page with timeout new_page = await asyncio.wait_for( new_page_future, timeout=timeout / 1000 # Convert to seconds ) # Generate ID for the new page page_id = str(uuid.uuid4()) # Set default timeout for new page new_page.set_default_timeout(config.timeout) # Add to pages dictionary browser_state.pages[page_id] = new_page # Set up network monitoring for the new page await _setup_network_monitoring_for_page(browser_state, new_page) # Wait a bit for the page to load initial content try: await new_page.wait_for_load_state("domcontentloaded", timeout=1000) except: pass # Page might still be loading, that's OK return NewPageResult( success=True, page_id=page_id, url=new_page.url, opener_page_id=opener_page_id ) except asyncio.TimeoutError: return NewPageResult( success=False, error=f"No new page opened within {timeout}ms" ) except Exception as e: return NewPageResult(success=False, error=str(e)) @mcp.tool() async def switch_to_latest_page(ctx: Context) -> PageSwitchResult: """Switch to the most recently opened page/tab. This is a convenience tool that switches to the newest page without needing to know its ID. Useful after clicking a link that opens in a new tab/window. Args: ctx: MCP context containing the browser state Returns: PageSwitchResult with success status and the new active page information """ try: browser_state = get_browser_state(ctx) if not browser_state.pages: return PageSwitchResult( success=False, error="No pages available" ) # Get the last page ID (most recently added) latest_page_id = list(browser_state.pages.keys())[-1] browser_state.set_current_page(latest_page_id) page = browser_state.pages[latest_page_id] return PageSwitchResult( success=True, page_id=latest_page_id, url=page.url ) except Exception as e: return PageSwitchResult(success=False, error=str(e)) # DOM Interaction Tools @mcp.tool() async def click(selector: str, ctx: Context) -> Dict[str, Any]: """Click an element on the page using a Playwright selector. This tool performs a left-click on the first element that matches the selector. Supports CSS selectors, text content, accessibility labels, and other Playwright selectors. Args: selector: Playwright selector to identify the element (e.g., "#button-id", "text=Click me", "[aria-label=Submit]") ctx: MCP context containing the browser state Returns: Dict with success status, selector used, and any error messages """ try: page = get_current_page(ctx) await page.click(selector) return {"success": True, "selector": selector} except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def type_text(selector: str, text: str, ctx: Context) -> Dict[str, Any]: """Type text into an input element character by character. This tool simulates human typing by sending individual keystrokes to the element. It does not clear existing text - use fill() for that behavior. Args: selector: Playwright selector for the input element (e.g., "input[name=username]", "#search-box") text: Text to type into the element ctx: MCP context containing the browser state Returns: Dict with success status, selector, text typed, and any error messages """ try: page = get_current_page(ctx) await page.type(selector, text) return {"success": True, "selector": selector, "text": text} except Exception as e: return {"success": False, "selector": selector, "text": text, "error": str(e)} @mcp.tool() async def fill(selector: str, value: str, ctx: Context) -> Dict[str, Any]: """Fill an input field with text, replacing any existing content. This tool clears the existing text and sets the new value in one operation. More efficient than type_text for replacing content entirely. Args: selector: Playwright selector for the input element (e.g., "input[type=email]", "textarea") value: Text value to set in the input field ctx: MCP context containing the browser state Returns: Dict with success status, selector, value set, and any error messages """ try: page = get_current_page(ctx) await page.fill(selector, value) return {"success": True, "selector": selector, "value": value} except Exception as e: return {"success": False, "selector": selector, "value": value, "error": str(e)} @mcp.tool() async def select_option(selector: str, value: str, ctx: Context) -> Dict[str, Any]: """Select an option from a dropdown/select element. This tool selects an option by value, label, or index from a <select> element. Args: selector: Playwright selector for the select element (e.g., "select[name=country]", "#dropdown") value: Option to select - can be the value attribute, visible text, or index (e.g., "US", "United States", "0") ctx: MCP context containing the browser state Returns: Dict with success status, selector, selected value, and any error messages """ try: page = get_current_page(ctx) await page.select_option(selector, value) return {"success": True, "selector": selector, "value": value} except Exception as e: return {"success": False, "selector": selector, "value": value, "error": str(e)} @mcp.tool() async def hover(selector: str, ctx: Context) -> Dict[str, Any]: """Move the mouse over an element to trigger hover effects. This tool simulates hovering the mouse cursor over an element, which can reveal tooltips, dropdown menus, or other hover-triggered content. Args: selector: Playwright selector for the element to hover over (e.g., ".menu-item", "[title=Help]") ctx: MCP context containing the browser state Returns: Dict with success status, selector, and any error messages """ try: page = get_current_page(ctx) await page.hover(selector) return {"success": True, "selector": selector} except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def scroll(selector: str, ctx: Context, x: int = 0, y: int = 0) -> Dict[str, Any]: """Scroll an element into view or scroll the page by specified amounts. If a selector is provided, scrolls that element into view. If selector is empty, performs a page scroll by the specified x and y pixel amounts. Args: selector: Playwright selector for element to scroll into view, or empty string for page scroll ctx: MCP context containing the browser state x: Horizontal scroll amount in pixels (for page scroll) y: Vertical scroll amount in pixels (for page scroll) Returns: Dict with success status, selector, scroll amounts, and any error messages """ try: page = get_current_page(ctx) if selector: element = await page.query_selector(selector) if element: await element.scroll_into_view_if_needed() return {"success": True, "selector": selector, "x": x, "y": y} else: return { "success": False, "selector": selector, "error": "Element not found", } else: await page.mouse.wheel(x, y) return {"success": True, "selector": selector, "x": x, "y": y} except Exception as e: return {"success": False, "selector": selector, "x": x, "y": y, "error": str(e)} # Element Discovery Tools @mcp.tool() async def query_selector( selector: str, ctx: Context, max_text_length: Optional[int] = None ) -> ElementQueryResult: """Find and return information about the first element matching a selector. This tool locates a single element on the page and returns detailed information including its tag name, text content, and all attributes. Args: selector: Playwright selector to find the element (e.g., "#main-header", "button:has-text('Submit')", "[data-testid=login]") ctx: MCP context containing the browser state max_text_length: Optional override for maximum characters returned for text and attributes (<=0 disables) Returns: ElementQueryResult with found status, element details (tag, text, attributes), and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: # Get element attributes tag_name = await element.evaluate("el => el.tagName") text_content = await element.evaluate("el => el.textContent") attributes = await element.evaluate( "el => Object.fromEntries(Array.from(el.attributes).map(attr => [attr.name, attr.value]))" ) text_limit = _resolve_limit( config.max_element_text_length, max_text_length ) text_content, text_truncated = _truncate_text(text_content, text_limit) attributes, attributes_truncated = _truncate_attributes( attributes, text_limit ) element_info: Dict[str, Any] = { "tag_name": tag_name, "text_content": text_content, "attributes": attributes, } if text_truncated: element_info["text_truncated"] = True if attributes_truncated: element_info["attributes_truncated"] = True return ElementQueryResult( found=True, count=1, returned_count=1, truncated=text_truncated or attributes_truncated, elements=[element_info], ) else: return ElementQueryResult(found=False, count=0) except Exception as e: return ElementQueryResult(found=False, count=0, error=str(e)) @mcp.tool() async def query_selector_all( selector: str, ctx: Context, max_elements: Optional[int] = None, max_text_length: Optional[int] = None, ) -> ElementQueryResult: """Find and return information about all elements matching a selector. This tool locates all elements on the page that match the selector and returns detailed information for each one including tag names, text content, and attributes. Args: selector: Playwright selector to find elements (e.g., ".nav-item", "input[type=checkbox]", "li") ctx: MCP context containing the browser state max_elements: Optional override for number of elements returned (<=0 disables) max_text_length: Optional override for maximum characters returned per element (<=0 disables) Returns: ElementQueryResult with found status, count, array of element details, and any errors """ try: page = get_current_page(ctx) elements = await page.query_selector_all(selector) total_count = len(elements) element_limit = _resolve_limit(config.max_elements_returned, max_elements) if element_limit > 0: elements = elements[:element_limit] elements_info = [] any_truncated = total_count > len(elements) for element in elements: tag_name = await element.evaluate("el => el.tagName") text_content = await element.evaluate("el => el.textContent") attributes = await element.evaluate( "el => Object.fromEntries(Array.from(el.attributes).map(attr => [attr.name, attr.value]))" ) text_limit = _resolve_limit( config.max_element_text_length, max_text_length ) text_content, text_truncated = _truncate_text(text_content, text_limit) attributes, attributes_truncated = _truncate_attributes( attributes, text_limit ) if text_truncated or attributes_truncated: any_truncated = True element_info: Dict[str, Any] = { "tag_name": tag_name, "text_content": text_content, "attributes": attributes, } if text_truncated: element_info["text_truncated"] = True if attributes_truncated: element_info["attributes_truncated"] = True elements_info.append(element_info) return ElementQueryResult( found=total_count > 0, count=total_count, returned_count=len(elements_info), elements=elements_info, truncated=any_truncated, ) except Exception as e: return ElementQueryResult(found=False, count=0, error=str(e)) @mcp.tool() async def query_selector_meta( selector: str, ctx: Context, preview_length: int = 200, max_elements: Optional[int] = None, ) -> ElementMetaResult: """Return lightweight metadata for elements matching a selector. Provides tag name, role, a short text preview, and key attributes without dumping full text content. Useful for scouting before fetching full details. Args: selector: Playwright selector to find elements ctx: MCP context containing the browser state preview_length: Maximum characters to include in the text preview (<=0 disables) max_elements: Optional override for number of elements returned (<=0 disables) """ try: page = get_current_page(ctx) elements = await page.query_selector_all(selector) total_count = len(elements) element_limit = _resolve_limit(config.max_elements_returned, max_elements) if element_limit > 0: elements = elements[:element_limit] elements_info = [] any_truncated = total_count > len(elements) preview_limit = preview_length if preview_length is not None else 200 for element in elements: element_data = await element.evaluate( "el => ({\n tagName: el.tagName,\n role: el.getAttribute('role'),\n textContent: el.textContent,\n attributes: Array.from(el.attributes).reduce((acc, attr) => { acc[attr.name] = attr.value; return acc; }, {})\n })" ) raw_text = element_data.get("textContent") or "" if preview_limit <= 0: text_preview = "" text_truncated = len(raw_text) > 0 else: text_preview, text_truncated = _truncate_text( raw_text, preview_limit ) raw_attributes = element_data.get("attributes") or {} filtered_attributes = { key: value for key, value in raw_attributes.items() if key in META_ATTRIBUTE_KEYS and value is not None } if element_data.get("role") and "role" not in filtered_attributes: filtered_attributes["role"] = element_data["role"] element_info = { "tag_name": element_data.get("tagName"), "role": element_data.get("role"), "text_preview": text_preview, "attributes": filtered_attributes, "preview_length": preview_limit, } if text_truncated: element_info["text_truncated"] = True any_truncated = True elements_info.append(element_info) return ElementMetaResult( found=total_count > 0, count=total_count, returned_count=len(elements_info), elements=elements_info, truncated=any_truncated, ) except Exception as e: return ElementMetaResult(found=False, count=0, error=str(e)) # Snapshotting Tools @mcp.tool() async def get_html(ctx: Context) -> Dict[str, Any]: """Retrieve the complete HTML source of the current page. This tool returns the page HTML (doctype, head, body). The payload is trimmed to the configured response budget and linked to an overflow artifact when the full document would exceed inline limits. Args: ctx: MCP context containing the browser state Returns: Dict with success status and the complete HTML source as a string """ try: page = get_current_page(ctx) html = await page.content() ( budgeted_html, truncated, preview, overflow_path, original_size, ) = _apply_response_budget( html, budget=config.max_response_characters, preview_limit=config.preview_characters, label="page_html", ) result: Dict[str, Any] = { "success": True, "html": budgeted_html, "original_length": len(html), } if truncated: result["truncated"] = True result["preview"] = preview result["overflow_characters"] = original_size if overflow_path: result["overflow_path"] = overflow_path return result except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def get_accessibility_snapshot( ctx: Context, interesting_only: bool = True, root_selector: Optional[str] = None, max_nodes: Optional[int] = None, ) -> Dict[str, Any]: """Capture the accessibility tree structure of the current page. This tool returns the accessibility tree used by screen readers and other assistive technologies, including roles, names, and hierarchical structure. Args: ctx: MCP context containing the browser state interesting_only: Whether to return only nodes Playwright deems interesting (default True) root_selector: Optional selector to scope the snapshot to a specific subtree max_nodes: Optional override for the node cap applied to the returned snapshot (<=0 disables) Returns: Dict with success status and the accessibility tree snapshot structure """ try: page = get_current_page(ctx) root_element = None if root_selector: root_element = await page.query_selector(root_selector) if root_element is None: return { "success": False, "error": "Root selector not found", "selector": root_selector, } snapshot = await page.accessibility.snapshot( interesting_only=interesting_only, root=root_element, ) node_limit = _resolve_limit(config.max_accessibility_nodes, max_nodes) pruned_snapshot, node_truncated, node_count = _prune_accessibility_snapshot( snapshot, node_limit ) ( budgeted_snapshot, budget_truncated, preview, overflow_path, original_size, ) = _apply_response_budget( pruned_snapshot, budget=config.max_response_characters, preview_limit=config.preview_characters, label="accessibility_snapshot", ) result: Dict[str, Any] = { "success": True, "snapshot": budgeted_snapshot, "node_count": node_count, "interesting_only": interesting_only, } if root_selector: result["root_selector"] = root_selector if node_truncated: result["truncated"] = True result["max_nodes"] = node_limit if budget_truncated: result["truncated"] = True result["preview"] = preview if overflow_path: result["overflow_path"] = overflow_path result["overflow_characters"] = original_size return result except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def screenshot( ctx: Context, selector: Optional[str] = None, full_page: bool = False, inline: bool = False, preview_bytes: int = 1024, ) -> ScreenshotResult: """Capture a screenshot of the page or a specific element. This tool captures either the viewport, the full page, or a specific element. By default it stores the PNG in the artifact directory and returns metadata plus a short inline preview. Set ``inline=True`` to embed the full base64 payload like the legacy behaviour. Args: ctx: MCP context containing the browser state selector: Optional Playwright selector for a specific element to screenshot full_page: If True, captures the entire page including content below the fold inline: When True, return the entire screenshot as base64 (may consume many tokens) preview_bytes: Number of bytes to include in the inline preview when not in inline mode Returns: ScreenshotResult with success status plus either inline base64 data or an ``artifact_path`` pointing at the saved PNG. """ try: page = get_current_page(ctx) dimensions: Optional[Dict[str, Union[int, float]]] = None if selector: element = await page.query_selector(selector) if not element: return ScreenshotResult(success=False, error="Element not found") screenshot_bytes = await element.screenshot() try: box = await element.bounding_box() except Exception: # pragma: no cover - best effort only box = None if box: dimensions = {"width": box.get("width"), "height": box.get("height")} else: screenshot_bytes = await page.screenshot(full_page=full_page) viewport = page.viewport_size or {} if viewport: dimensions = { "width": viewport.get("width"), "height": viewport.get("height"), } byte_size = len(screenshot_bytes) sha256 = hashlib.sha256(screenshot_bytes).hexdigest() if inline: screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8") preview = ( screenshot_base64[: config.preview_characters] if config.preview_characters > 0 else None ) return ScreenshotResult( success=True, data=screenshot_base64, format="png", byte_size=byte_size, sha256=sha256, preview_base64=preview, dimensions=dimensions, inline=True, ) preview_slice = screenshot_bytes[: max(0, min(preview_bytes, byte_size))] preview_base64 = ( base64.b64encode(preview_slice).decode("utf-8") if preview_slice else None ) artifact_path = _make_artifact_path("screenshot", suffix=".png") try: artifact_path.write_bytes(screenshot_bytes) _enforce_artifact_retention() except Exception as exc: return ScreenshotResult(success=False, error=str(exc)) return ScreenshotResult( success=True, format="png", artifact_path=str(artifact_path), byte_size=byte_size, sha256=sha256, preview_base64=preview_base64, dimensions=dimensions, inline=False, ) except Exception as e: return ScreenshotResult(success=False, error=str(e)) @mcp.tool() async def pdf( ctx: Context, inline: bool = False, preview_bytes: int = 2048 ) -> PDFResult: """Generate a PDF document from the current page. This tool converts the current page to a PDF (Chromium only). By default the document is saved to the artifact directory and only metadata plus a short preview are returned. Set ``inline=True`` to embed the entire base64 payload when absolutely necessary. Args: ctx: MCP context containing the browser state inline: When True, return the full base64 PDF inline (large responses) preview_bytes: Number of bytes to expose in the inline preview when not embedding Returns: PDFResult with success status and either inline data or an ``artifact_path`` referencing the saved PDF. """ try: page = get_current_page(ctx) pdf_bytes = await page.pdf() byte_size = len(pdf_bytes) sha256 = hashlib.sha256(pdf_bytes).hexdigest() if inline: pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8") preview = ( pdf_base64[: config.preview_characters] if config.preview_characters > 0 else None ) return PDFResult( success=True, data=pdf_base64, byte_size=byte_size, sha256=sha256, preview_base64=preview, inline=True, ) preview_slice = pdf_bytes[: max(0, min(preview_bytes, byte_size))] preview_base64 = ( base64.b64encode(preview_slice).decode("utf-8") if preview_slice else None ) artifact_path = _make_artifact_path("pdf", suffix=".pdf") try: artifact_path.write_bytes(pdf_bytes) _enforce_artifact_retention() except Exception as exc: return PDFResult(success=False, error=str(exc)) return PDFResult( success=True, artifact_path=str(artifact_path), byte_size=byte_size, sha256=sha256, preview_base64=preview_base64, inline=False, ) except Exception as e: return PDFResult(success=False, error=str(e)) # Script Evaluation Tool @mcp.tool() async def evaluate(script: str, ctx: Context) -> ScriptResult: """Execute JavaScript code in the browser page context. This tool runs arbitrary JavaScript code within the page's context, allowing access to DOM, window objects, and page variables. Args: script: JavaScript code to execute (e.g., "document.title", "window.scrollTo(0, 100)") ctx: MCP context containing the browser state Returns: ScriptResult with success status, execution result (if any), and any errors """ try: page = get_current_page(ctx) result = await page.evaluate(script) ( processed_result, truncated, preview, overflow_path, original_size, ) = _apply_response_budget( result, budget=config.max_response_characters, preview_limit=config.preview_characters, label="evaluate_result", ) payload: Dict[str, Any] = { "success": True, "result": processed_result, } if truncated: payload["truncated"] = True payload["preview"] = preview if overflow_path: payload["overflow_path"] = overflow_path payload["overflow_characters"] = original_size return ScriptResult(**payload) except Exception as e: return ScriptResult(success=False, error=str(e)) # Element State & Validation Tools @mcp.tool() async def is_visible(selector: str, ctx: Context) -> Dict[str, Any]: """Check whether an element is visible on the page. This tool determines if an element is currently visible to users, taking into account CSS visibility, display properties, and whether it's within the viewport. Args: selector: Playwright selector for the element to check (e.g., "#modal", ".hidden-element") ctx: MCP context containing the browser state Returns: Dict with success status, selector, visibility boolean, and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: visible = await element.is_visible() return {"success": True, "selector": selector, "visible": visible} else: return { "success": False, "selector": selector, "error": "Element not found", } except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def is_enabled(selector: str, ctx: Context) -> Dict[str, Any]: """Check whether an element is enabled and can be interacted with. This tool determines if form elements like buttons, inputs, and selects are enabled (not disabled) and can receive user interactions. Args: selector: Playwright selector for the element to check (e.g., "button[type=submit]", "input[name=email]") ctx: MCP context containing the browser state Returns: Dict with success status, selector, enabled boolean, and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: enabled = await element.is_enabled() return {"success": True, "selector": selector, "enabled": enabled} else: return { "success": False, "selector": selector, "error": "Element not found", } except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def wait_for_element( selector: str, ctx: Context, timeout: int = 30000 ) -> Dict[str, Any]: """Wait for an element to appear in the DOM. This tool waits until an element matching the selector becomes available, useful for handling dynamic content that loads after page initialization. Args: selector: Playwright selector for the element to wait for (e.g., ".loading-complete", "[data-loaded=true]") ctx: MCP context containing the browser state timeout: Maximum wait time in milliseconds (default: 30000) Returns: Dict with success status, selector, timeout value, and any errors """ try: page = get_current_page(ctx) await page.wait_for_selector(selector, timeout=timeout) return {"success": True, "selector": selector, "timeout": timeout} except Exception as e: return { "success": False, "selector": selector, "timeout": timeout, "error": str(e), } @mcp.tool() async def wait_for_load_state( state: str, ctx: Context, timeout: int = 30000 ) -> Dict[str, Any]: """Wait for the page to reach a specific loading state. This tool waits for different stages of page loading to complete, ensuring content is ready before proceeding with interactions. Args: state: Loading state to wait for - "domcontentloaded" (DOM ready), "load" (all resources), or "networkidle" (no requests for 500ms) ctx: MCP context containing the browser state timeout: Maximum wait time in milliseconds (default: 30000) Returns: Dict with success status, target state, timeout value, and any errors """ try: page = get_current_page(ctx) await page.wait_for_load_state(state, timeout=timeout) return {"success": True, "state": state, "timeout": timeout} except Exception as e: return {"success": False, "state": state, "timeout": timeout, "error": str(e)} # Form & Input Handling Tools @mcp.tool() async def clear_text(selector: str, ctx: Context) -> Dict[str, Any]: """Clear all text content from an input field. This tool removes all existing text from text inputs, textareas, and other editable fields, leaving them empty. Args: selector: Playwright selector for the input element (e.g., "input[name=message]", "#comment-box") ctx: MCP context containing the browser state Returns: Dict with success status, selector, and any errors """ try: page = get_current_page(ctx) await page.fill(selector, "") return {"success": True, "selector": selector} except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def check_checkbox(selector: str, ctx: Context) -> Dict[str, Any]: """Check a checkbox or radio button element. This tool selects a checkbox or radio button, setting it to the checked state. If already checked, the operation has no effect. Args: selector: Playwright selector for the checkbox/radio element (e.g., "input[type=checkbox][name=agree]", "#newsletter") ctx: MCP context containing the browser state Returns: Dict with success status, selector, action performed, and any errors """ try: page = get_current_page(ctx) await page.check(selector) return {"success": True, "selector": selector, "action": "checked"} except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def uncheck_checkbox(selector: str, ctx: Context) -> Dict[str, Any]: """Uncheck a checkbox element. This tool deselects a checkbox, setting it to the unchecked state. If already unchecked, the operation has no effect. Args: selector: Playwright selector for the checkbox element (e.g., "input[type=checkbox][name=notifications]", ".privacy-checkbox") ctx: MCP context containing the browser state Returns: Dict with success status, selector, action performed, and any errors """ try: page = get_current_page(ctx) await page.uncheck(selector) return {"success": True, "selector": selector, "action": "unchecked"} except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def upload_file(selector: str, file_path: str, ctx: Context) -> Dict[str, Any]: """Upload a file through a file input element. This tool selects and uploads a file from the local filesystem to a file input element on the page. Args: selector: Playwright selector for the file input element (e.g., "input[type=file]", "#avatar-upload") file_path: Absolute path to the file to upload (e.g., "/path/to/document.pdf") ctx: MCP context containing the browser state Returns: Dict with success status, selector, file path, and any errors """ try: page = get_current_page(ctx) await page.set_input_files(selector, file_path) return {"success": True, "selector": selector, "file_path": file_path} except Exception as e: return { "success": False, "selector": selector, "file_path": file_path, "error": str(e), } @mcp.tool() async def press_key(key: str, ctx: Context) -> Dict[str, Any]: """Simulate pressing a keyboard key. This tool sends a key press event to the page, useful for shortcuts, navigation keys, or special key combinations. Args: key: Key to press (e.g., "Enter", "Escape", "Tab", "Control+s", "ArrowDown") ctx: MCP context containing the browser state Returns: Dict with success status, key pressed, and any errors """ try: page = get_current_page(ctx) await page.keyboard.press(key) return {"success": True, "key": key} except Exception as e: return {"success": False, "key": key, "error": str(e)} # Advanced Navigation Tools @mcp.tool() async def wait_for_url( url_pattern: str, ctx: Context, timeout: int = 30000 ) -> Dict[str, Any]: """Wait for the browser URL to match a specific pattern. This tool waits until the current page URL matches the provided pattern, useful for handling redirects or navigation completion. Args: url_pattern: URL pattern to match - can be exact URL, glob pattern, or regex (e.g., "**/dashboard", "https://example.com/success") ctx: MCP context containing the browser state timeout: Maximum wait time in milliseconds (default: 30000) Returns: Dict with success status, URL pattern, current URL, timeout, and any errors """ try: page = get_current_page(ctx) await page.wait_for_url(url_pattern, timeout=timeout) current_url = page.url return { "success": True, "url_pattern": url_pattern, "current_url": current_url, "timeout": timeout, } except Exception as e: return { "success": False, "url_pattern": url_pattern, "timeout": timeout, "error": str(e), } @mcp.tool() async def set_viewport_size(width: int, height: int, ctx: Context) -> Dict[str, Any]: """Change the browser viewport dimensions. This tool resizes the browser's viewport to simulate different screen sizes, useful for testing responsive designs or mobile layouts. Args: width: Viewport width in pixels (e.g., 1920, 768, 375) height: Viewport height in pixels (e.g., 1080, 1024, 667) ctx: MCP context containing the browser state Returns: Dict with success status, new dimensions, and any errors """ try: page = get_current_page(ctx) await page.set_viewport_size({"width": width, "height": height}) return {"success": True, "width": width, "height": height} except Exception as e: return {"success": False, "width": width, "height": height, "error": str(e)} # Element Discovery & Analysis Tools @mcp.tool() async def get_element_bounding_box(selector: str, ctx: Context) -> Dict[str, Any]: """Get the position and dimensions of an element. This tool returns the bounding box coordinates and size of an element, useful for layout analysis or positioning calculations. Args: selector: Playwright selector for the element (e.g., "#main-content", ".sidebar") ctx: MCP context containing the browser state Returns: Dict with success status, selector, bounding box (x, y, width, height), and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: box = await element.bounding_box() return {"success": True, "selector": selector, "bounding_box": box} else: return { "success": False, "selector": selector, "error": "Element not found", } except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def get_element_attributes(selector: str, ctx: Context) -> Dict[str, Any]: """Retrieve all HTML attributes of an element. This tool returns a dictionary of all attributes and their values for the specified element, useful for inspecting element properties. Args: selector: Playwright selector for the element (e.g., "img[alt]", "a.external-link") ctx: MCP context containing the browser state Returns: Dict with success status, selector, attributes dictionary, and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: attributes = await element.evaluate( "el => Object.fromEntries(Array.from(el.attributes).map(attr => [attr.name, attr.value]))" ) return {"success": True, "selector": selector, "attributes": attributes} else: return { "success": False, "selector": selector, "error": "Element not found", } except Exception as e: return {"success": False, "selector": selector, "error": str(e)} @mcp.tool() async def get_computed_style( selector: str, property: str, ctx: Context ) -> Dict[str, Any]: """Get the computed CSS value for a specific style property of an element. This tool retrieves the final computed CSS value after all stylesheets and inheritance rules have been applied. Args: selector: Playwright selector for the element (e.g., ".header", "#main-nav") property: CSS property name (e.g., "color", "fontSize", "display", "margin-top") ctx: MCP context containing the browser state Returns: Dict with success status, selector, property name, computed value, and any errors """ try: page = get_current_page(ctx) element = await page.query_selector(selector) if element: style_value = await element.evaluate( f"el => getComputedStyle(el).{property}" ) return { "success": True, "selector": selector, "property": property, "value": style_value, } else: return { "success": False, "selector": selector, "error": "Element not found", } except Exception as e: return { "success": False, "selector": selector, "property": property, "error": str(e), } # Network & Debugging Tools @mcp.tool() async def wait_for_network_idle(ctx: Context, timeout: int = 30000) -> Dict[str, Any]: """Wait for network activity to become idle. This tool waits until there are no network requests for at least 500ms, indicating that dynamic content loading has completed. Args: ctx: MCP context containing the browser state timeout: Maximum wait time in milliseconds (default: 30000) Returns: Dict with success status, timeout value, and any errors """ try: page = get_current_page(ctx) await page.wait_for_load_state("networkidle", timeout=timeout) return {"success": True, "timeout": timeout} except Exception as e: return {"success": False, "timeout": timeout, "error": str(e)} @mcp.tool() async def get_page_errors(ctx: Context) -> Dict[str, Any]: """Retrieve JavaScript errors that occurred on the page. This tool returns any JavaScript errors that have been captured during page execution. Note: Error collection requires browser setup with listeners. Args: ctx: MCP context containing the browser state Returns: Dict with success status, array of error messages, and any errors """ try: page = get_current_page(ctx) # Note: This would require setting up error listeners during browser initialization # For now, we'll evaluate to check for any stored errors errors = await page.evaluate( """ () => { if (window.pageErrors) { return window.pageErrors; } return []; } """ ) return {"success": True, "errors": errors} except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def get_console_logs(ctx: Context) -> Dict[str, Any]: """Retrieve console log messages from the page. This tool returns console.log, console.error, and other console messages that occurred during page execution. Note: Log collection requires browser setup. Args: ctx: MCP context containing the browser state Returns: Dict with success status, array of console messages, setup note, and any errors """ try: page = get_current_page(ctx) # Note: This would require setting up console listeners during browser initialization # For now, we'll return a message about setup requirements logs = await page.evaluate( """ () => { if (window.consoleLogs) { return window.consoleLogs; } return []; } """ ) return { "success": True, "logs": logs, "note": "Console logging requires browser setup with listeners", } except Exception as e: return {"success": False, "error": str(e)} # Network Monitoring & Interception Tools @mcp.tool() async def get_network_requests( ctx: Context, url_pattern: Optional[str] = None ) -> Dict[str, Any]: """Retrieve captured network requests with optional URL filtering. This tool returns all network requests that have been captured since the page started loading, including details like method, headers, and post data. Args: ctx: MCP context containing the browser state url_pattern: Optional glob pattern to filter requests by URL (e.g., "*/api/*", "https://example.com/*") Returns: Dict with success status, array of request details, and any errors """ try: import fnmatch browser_state = get_browser_state(ctx) requests = browser_state.captured_requests if url_pattern: requests = [ req for req in requests if fnmatch.fnmatch(req["url"], url_pattern) ] total_count = len(requests) ( clipped_requests, truncated, preview, overflow_path, original_size, ) = _apply_response_budget( requests, budget=config.max_response_characters, preview_limit=config.preview_characters, label="network_requests", ) result: Dict[str, Any] = { "success": True, "requests": clipped_requests, "count": total_count, } if isinstance(clipped_requests, list): result["returned_count"] = len(clipped_requests) if truncated: result["truncated"] = True result["preview"] = preview if overflow_path: result["overflow_path"] = overflow_path result["overflow_characters"] = original_size return result except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def get_network_responses( ctx: Context, url_pattern: Optional[str] = None ) -> Dict[str, Any]: """Retrieve captured network responses with optional URL filtering. This tool returns all network responses that have been captured since the page started loading, including status codes, headers, and response details. Args: ctx: MCP context containing the browser state url_pattern: Optional glob pattern to filter responses by URL (e.g., "*/api/*", "*.json") Returns: Dict with success status, array of response details, and any errors """ try: import fnmatch browser_state = get_browser_state(ctx) responses = browser_state.captured_responses if url_pattern: responses = [ resp for resp in responses if fnmatch.fnmatch(resp["url"], url_pattern) ] total_count = len(responses) ( clipped_responses, truncated, preview, overflow_path, original_size, ) = _apply_response_budget( responses, budget=config.max_response_characters, preview_limit=config.preview_characters, label="network_responses", ) result: Dict[str, Any] = { "success": True, "responses": clipped_responses, "count": total_count, } if isinstance(clipped_responses, list): result["returned_count"] = len(clipped_responses) if truncated: result["truncated"] = True result["preview"] = preview if overflow_path: result["overflow_path"] = overflow_path result["overflow_characters"] = original_size return result except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def clear_network_logs(ctx: Context) -> Dict[str, Any]: """Clear all captured network request and response logs. This tool resets the network monitoring logs, useful for starting fresh network monitoring for specific page interactions. Args: ctx: MCP context containing the browser state Returns: Dict with success status and cleared counts """ try: browser_state = get_browser_state(ctx) request_count = len(browser_state.captured_requests) response_count = len(browser_state.captured_responses) browser_state.captured_requests.clear() browser_state.captured_responses.clear() browser_state.response_handles.clear() browser_state.response_handle_order.clear() return { "success": True, "cleared_requests": request_count, "cleared_responses": response_count, } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def intercept_route( ctx: Context, url_pattern: str, action: str, status_code: Optional[int] = 200, response_body: Optional[str] = None, response_headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """Intercept network requests matching a URL pattern and handle them. This tool allows intercepting network requests and either blocking them, fulfilling them with custom responses, or modifying them before they continue. Args: ctx: MCP context containing the browser state url_pattern: Glob pattern to match URLs (e.g., "**/api/users", "https://example.com/*") action: Action to take - "block", "fulfill", or "continue" status_code: HTTP status code for fulfilled responses (default: 200) response_body: Response body for fulfilled requests response_headers: Custom headers for fulfilled responses Returns: Dict with success status, intercepted pattern, action, and any errors """ try: page = get_current_page(ctx) async def route_handler(route: Route, request: Request): if action == "block": await route.abort() elif action == "fulfill": await route.fulfill( status=status_code or 200, body=response_body or "", headers=response_headers or {}, ) else: # continue await route.continue_() await page.route(url_pattern, route_handler) return { "success": True, "url_pattern": url_pattern, "action": action, "status_code": status_code, } except Exception as e: return {"success": False, "url_pattern": url_pattern, "error": str(e)} @mcp.tool() async def unroute_all(ctx: Context) -> Dict[str, Any]: """Remove all route interceptors. This tool clears all previously set up route interceptors, allowing network requests to proceed normally. Args: ctx: MCP context containing the browser state Returns: Dict with success status and any errors """ try: page = get_current_page(ctx) await page.unroute_all() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def wait_for_response( ctx: Context, url_pattern: str, timeout: int = 30000 ) -> Dict[str, Any]: """Wait for a network response matching the specified URL pattern. This tool waits for a specific network response to occur, useful for waiting for API calls or resource loading to complete. Args: ctx: MCP context containing the browser state url_pattern: Glob pattern or URL to wait for (e.g., "**/api/data", "https://example.com/endpoint") timeout: Maximum wait time in milliseconds (default: 30000) Returns: Dict with success status, response details (URL, status), and any errors """ try: page = get_current_page(ctx) response = await page.wait_for_response( url_pattern, timeout=timeout ) return { "success": True, "url": response.url, "status": response.status, "status_text": response.status_text, "headers": await response.all_headers(), } except Exception as e: return { "success": False, "url_pattern": url_pattern, "timeout": timeout, "error": str(e), } @mcp.tool() async def get_response_body(ctx: Context, url_pattern: str) -> Dict[str, Any]: """Get the response body for the most recent response matching a URL pattern. This tool retrieves network response bodies. Text payloads are trimmed with the shared response budget; binary bodies are written to the artifact store and returned by reference. Args: ctx: MCP context containing the browser state url_pattern: Glob pattern to match response URLs Returns: Dict with success status, response body (as text), and any errors """ try: import fnmatch browser_state = get_browser_state(ctx) matching_responses = [ resp for resp in reversed(browser_state.captured_responses) if fnmatch.fnmatch(resp["url"], url_pattern) ] if not matching_responses: return {"success": False, "error": "No matching responses found"} response_info = matching_responses[0] handle_id = response_info.get("handle_id") response_obj = browser_state.response_handles.get(handle_id) if handle_id else None if response_obj is None: return { "success": False, "error": "No live response handle available. Re-run the request before fetching the body.", } body_bytes = await response_obj.body() headers = await response_obj.all_headers() content_type = headers.get("content-type", "") if not content_type: content_type = response_info.get("headers", {}).get("content-type", "") byte_size = len(body_bytes) sha256 = hashlib.sha256(body_bytes).hexdigest() if _is_text_content(content_type): text = body_bytes.decode("utf-8", errors="replace") ( processed_text, truncated, preview, overflow_path, original_size, ) = _apply_response_budget( text, budget=config.max_response_characters, preview_limit=config.preview_characters, label="response_body", ) result: Dict[str, Any] = { "success": True, "url": response_obj.url, "body": processed_text, "content_type": content_type, "byte_size": byte_size, "sha256": sha256, "headers": headers, } if truncated: result["truncated"] = True result["preview"] = preview result["overflow_characters"] = original_size if overflow_path: result["overflow_path"] = overflow_path return result suffix = _guess_suffix_from_mime(content_type, ".bin") artifact_path = _make_artifact_path("response_body", suffix=suffix) try: artifact_path.write_bytes(body_bytes) _enforce_artifact_retention() except Exception as exc: return {"success": False, "error": str(exc)} preview_slice = body_bytes[: min(byte_size, 512)] preview_base64 = ( base64.b64encode(preview_slice).decode("utf-8") if preview_slice else None ) return { "success": True, "url": response_obj.url, "content_type": content_type, "byte_size": byte_size, "sha256": sha256, "artifact_path": str(artifact_path), "preview_base64": preview_base64, "binary": True, "headers": headers, } except Exception as e: return {"success": False, "url_pattern": url_pattern, "error": str(e)} def _read_artifact_chunk_impl(path: Path, offset: int, limit: int) -> Dict[str, Any]: """Read a bounded chunk from an artifact file.""" byte_size = max(path.stat().st_size, 0) start = min(max(offset, 0), byte_size) chunk_size = max(limit, 1) with path.open("rb") as handle: handle.seek(start) data = handle.read(chunk_size) is_binary = _is_probably_binary(data) if data else path.suffix not in {".txt", ".json", ".log"} payload: Dict[str, Any] = { "path": str(path), "offset": start, "bytes_read": len(data), "byte_size": byte_size, "binary": bool(is_binary), "next_offset": start + len(data) if start + len(data) < byte_size else None, } if not data: payload["content"] = "" payload["binary"] = False return payload if is_binary: payload["content_base64"] = base64.b64encode(data).decode("utf-8") else: payload["content"] = data.decode("utf-8", errors="replace") return payload @mcp.tool() async def read_artifact(path: str, ctx: Context) -> Dict[str, Any]: """Read an artifact produced by response budgeting. The first chunk of the artifact is returned using the configured inline budget. Use :func:`read_artifact_chunk` for additional data when ``truncated`` is true. """ del ctx # Maintains consistent tool signature; context not needed here. limit = config.max_response_characters if limit <= 0: limit = config.artifact_chunk_size try: resolved_path = _resolve_artifact_path(path) payload = _read_artifact_chunk_impl(resolved_path, offset=0, limit=limit) truncated = payload.get("next_offset") is not None result: Dict[str, Any] = {"success": True, **payload} if truncated: result["truncated"] = True result["message"] = "Content truncated. Use read_artifact_chunk for additional data." return result except (ValueError, PermissionError, FileNotFoundError) as exc: return {"success": False, "error": str(exc)} except Exception as exc: # pragma: no cover - defensive return {"success": False, "error": str(exc)} @mcp.tool() async def read_artifact_chunk( path: str, ctx: Context, offset: int = 0, limit: Optional[int] = None ) -> Dict[str, Any]: """Read a bounded chunk from an artifact file. Returns decoded text when the bytes look textual, otherwise base64 payloads so binary content stays compact. ``next_offset`` indicates where to resume. """ del ctx # Tool does not need the browser context. try: resolved_path = _resolve_artifact_path(path) chunk_limit = limit if limit and limit > 0 else config.artifact_chunk_size payload = _read_artifact_chunk_impl(resolved_path, offset=offset, limit=chunk_limit) return {"success": True, **payload} except (ValueError, PermissionError, FileNotFoundError) as exc: return {"success": False, "error": str(exc)} except Exception as exc: # pragma: no cover - defensive return {"success": False, "error": str(exc)} # Cookie Management Tools @mcp.tool() async def get_cookies(ctx: Context, urls: Optional[List[str]] = None) -> CookieResult: """Retrieve cookies from the browser context. This tool returns cookies from the current browser context, with optional filtering by specific URLs or domains. Args: ctx: MCP context containing the browser state urls: Optional list of URLs to filter cookies by domain Returns: CookieResult with success status, array of cookie objects, and any errors """ try: browser_state = get_browser_state(ctx) cookies = await browser_state.context.cookies(urls or []) return CookieResult(success=True, cookies=cookies) except Exception as e: return CookieResult(success=False, error=str(e)) @mcp.tool() async def add_cookies(ctx: Context, cookies: List[Dict[str, Any]]) -> CookieResult: """Add cookies to the browser context. This tool adds one or more cookies to the current browser context, making them available for all pages in the context. Args: ctx: MCP context containing the browser state cookies: List of cookie objects with name, value, domain, path, etc. Example: [{"name": "session", "value": "abc123", "domain": "example.com"}] Returns: CookieResult with success status and any errors """ try: browser_state = get_browser_state(ctx) await browser_state.context.add_cookies(cookies) return CookieResult(success=True, cookies=cookies) except Exception as e: return CookieResult(success=False, error=str(e)) @mcp.tool() async def clear_cookies( ctx: Context, name: Optional[str] = None, domain: Optional[str] = None ) -> CookieResult: """Clear cookies from the browser context. This tool removes cookies from the current browser context, with optional filtering by name or domain to clear specific cookies. Args: ctx: MCP context containing the browser state name: Optional cookie name to clear specific cookie domain: Optional domain to clear cookies from specific domain Returns: CookieResult with success status and any errors """ try: browser_state = get_browser_state(ctx) # Build the filter for clearing cookies clear_filter = {} if name: clear_filter["name"] = name if domain: clear_filter["domain"] = domain await browser_state.context.clear_cookies(**clear_filter) return CookieResult(success=True, cookies=[{"cleared_filter": clear_filter}]) except Exception as e: return CookieResult(success=False, error=str(e)) # Storage Management Tools @mcp.tool() async def get_local_storage( ctx: Context, origin: Optional[str] = None ) -> StorageResult: """Retrieve localStorage data from the current page or specified origin. This tool accesses browser localStorage data, useful for inspecting client-side stored data and application state. Args: ctx: MCP context containing the browser state origin: Optional origin URL to get storage from (defaults to current page) Returns: StorageResult with success status, localStorage data as dict, and any errors """ try: browser_state = get_browser_state(ctx) page = get_current_page(ctx) if origin: # Get storage for specific origin storage_state = await browser_state.context.storage_state() origin_storage = {} for origin_data in storage_state.get("origins", []): if origin_data["origin"] == origin: origin_storage = { item["name"]: item["value"] for item in origin_data.get("localStorage", []) } break data = origin_storage else: # Get localStorage for current page data = await page.evaluate( """ () => { const storage = {}; for (let i = 0; i < localStorage.length; i++) { const key = localStorage.key(i); storage[key] = localStorage.getItem(key); } return storage; } """ ) return StorageResult(success=True, data=data) except Exception as e: return StorageResult(success=False, error=str(e)) @mcp.tool() async def set_local_storage(ctx: Context, key: str, value: str) -> StorageResult: """Set a localStorage item in the current page. This tool stores data in the browser's localStorage for the current page, useful for setting up application state or test data. Args: ctx: MCP context containing the browser state key: Storage key name value: Storage value to set Returns: StorageResult with success status, set data, and any errors """ try: page = get_current_page(ctx) await page.evaluate( """ (args) => localStorage.setItem(args.key, args.value) """, {"key": key, "value": value}, ) return StorageResult(success=True, data={key: value}) except Exception as e: return StorageResult(success=False, error=str(e)) @mcp.tool() async def get_session_storage(ctx: Context) -> StorageResult: """Retrieve sessionStorage data from the current page. This tool accesses browser sessionStorage data, useful for inspecting session-specific client-side stored data. Args: ctx: MCP context containing the browser state Returns: StorageResult with success status, sessionStorage data as dict, and any errors """ try: page = get_current_page(ctx) data = await page.evaluate( """ () => { const storage = {}; for (let i = 0; i < sessionStorage.length; i++) { const key = sessionStorage.key(i); storage[key] = sessionStorage.getItem(key); } return storage; } """ ) return StorageResult(success=True, data=data) except Exception as e: return StorageResult(success=False, error=str(e)) @mcp.tool() async def set_session_storage(ctx: Context, key: str, value: str) -> StorageResult: """Set a sessionStorage item in the current page. This tool stores data in the browser's sessionStorage for the current page, useful for setting up session-specific test data. Args: ctx: MCP context containing the browser state key: Storage key name value: Storage value to set Returns: StorageResult with success status, set data, and any errors """ try: page = get_current_page(ctx) await page.evaluate( """ (args) => sessionStorage.setItem(args.key, args.value) """, {"key": key, "value": value}, ) return StorageResult(success=True, data={key: value}) except Exception as e: return StorageResult(success=False, error=str(e)) @mcp.tool() async def clear_storage(ctx: Context, storage_type: str = "both") -> StorageResult: """Clear localStorage and/or sessionStorage from the current page. This tool removes stored data from the browser storage, useful for resetting application state between test scenarios. Args: ctx: MCP context containing the browser state storage_type: Type of storage to clear - "local", "session", or "both" (default) Returns: StorageResult with success status, cleared storage types, and any errors """ try: # Need the active page to execute storage clearing script page = get_current_page(ctx) script = "" if storage_type in ["local", "both"]: script += "localStorage.clear();" if storage_type in ["session", "both"]: script += "sessionStorage.clear();" await page.evaluate(script) return StorageResult(success=True, data={"cleared": storage_type}) except Exception as e: return StorageResult(success=False, error=str(e)) # Request Headers & User Agent Tools @mcp.tool() async def set_extra_headers(ctx: Context, headers: Dict[str, str]) -> Dict[str, Any]: """Set additional HTTP headers for all requests in the browser context. This tool adds extra HTTP headers that will be sent with all network requests from pages in the current browser context. Args: ctx: MCP context containing the browser state headers: Dictionary of header name-value pairs to add Returns: Dict with success status, set headers, and any errors """ try: page = get_current_page(ctx) await page.set_extra_http_headers(headers) return {"success": True, "headers": headers} except Exception as e: return {"success": False, "headers": headers, "error": str(e)} @mcp.tool() async def set_user_agent(ctx: Context, user_agent: str) -> Dict[str, Any]: """Set the User-Agent header for the browser context. This tool changes the User-Agent string that will be sent with requests, useful for testing different browser/device behaviors or bypassing restrictions. Args: ctx: MCP context containing the browser state user_agent: User-Agent string to use Returns: Dict with success status, set user agent, and any errors """ try: page = get_current_page(ctx) await page.set_user_agent(user_agent) return {"success": True, "user_agent": user_agent} except Exception as e: return {"success": False, "user_agent": user_agent, "error": str(e)} def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser(description="Playwright MCP Server") parser.add_argument("transport", choices=["stdio", "http"], help="Transport type") parser.add_argument( "--port", type=int, default=8000, help="Port for HTTP transport" ) parser.add_argument("--headed", action="store_true", help="Run in headed mode") parser.add_argument( "--browser", choices=["chromium", "firefox", "webkit"], default="chromium", help="Browser type", ) parser.add_argument( "--timeout", type=int, default=30000, help="Default timeout (ms)" ) parser.add_argument( "--channel", choices=[ "chrome", "chrome-beta", "chrome-dev", "chrome-canary", "msedge", "msedge-beta", "msedge-dev", "msedge-canary", ], help="Browser channel (use real Chrome/Edge instead of bundled Chromium)", ) parser.add_argument( "--user-data-dir", type=str, help="Path to Chrome user data directory (enables persistent context with your profile)", ) parser.add_argument( "--max-elements", type=int, default=config.max_elements_returned, help="Maximum number of elements returned by query selector tools (<=0 disables)", ) parser.add_argument( "--max-element-text-length", type=int, default=config.max_element_text_length, help="Maximum characters returned for element text/attributes (<=0 disables)", ) parser.add_argument( "--max-accessibility-nodes", type=int, default=config.max_accessibility_nodes, help="Maximum nodes included in accessibility snapshots (<=0 disables)", ) parser.add_argument( "--max-response-chars", type=int, default=config.max_response_characters, help="Maximum characters returned inline before saving overflow to an artifact (<=0 disables)", ) parser.add_argument( "--preview-chars", type=int, default=config.preview_characters, help="Maximum characters included in inline previews for truncated payloads (<=0 disables)", ) parser.add_argument( "--artifact-dir", type=str, default=str(config.artifact_directory), help=( "Artifact root directory. A per-session subdirectory is created " "automatically to avoid collisions between concurrent Codex/MCP runs" ), ) parser.add_argument( "--artifact-max-age-seconds", type=int, default=config.artifact_max_age_seconds, help="Maximum age (in seconds) before overflow artifacts are deleted (<=0 disables)", ) parser.add_argument( "--artifact-max-files", type=int, default=config.artifact_max_files, help="Maximum number of overflow artifacts to retain (<=0 disables)", ) parser.add_argument( "--artifact-chunk-size", type=int, default=config.artifact_chunk_size, help="Default number of bytes returned by artifact chunk reads", ) args = parser.parse_args() # Update global configuration config.headless = not args.headed config.browser_type = args.browser config.timeout = args.timeout config.channel = args.channel config.user_data_dir = getattr(args, "user_data_dir", None) config.max_elements_returned = args.max_elements config.max_element_text_length = args.max_element_text_length config.max_accessibility_nodes = args.max_accessibility_nodes config.max_response_characters = args.max_response_chars config.preview_characters = args.preview_chars # Compute per-session artifact directory under the provided base/root. artifact_base = Path(args.artifact_dir).expanduser().absolute() session_dir = _session_artifact_dir(artifact_base) config.artifact_directory = session_dir config.artifact_max_age_seconds = max(args.artifact_max_age_seconds, 0) config.artifact_max_files = args.artifact_max_files config.artifact_chunk_size = max(args.artifact_chunk_size, 256) # Setup logging before emitting any log lines logging.basicConfig(level=logging.INFO) # Ensure the session directory exists and apply retention within the session only. _enforce_artifact_retention() # Prune old session directories across all sessions at startup (TTL-based). _enforce_session_retention(artifact_base) logger.info("Artifact base: %s", str(artifact_base)) logger.info("Using per-session artifact dir: %s", str(config.artifact_directory)) # Run the server using FastMCP's run method with transport if args.transport == "stdio": mcp.run() else: # HTTP transport using StreamableHTTP import uvicorn app = mcp.streamable_http_app() uvicorn.run(app, host="0.0.0.0", port=args.port) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alexrwilliam/playwright-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server