Skip to main content
Glama
tools.py13.9 kB
"""Playwright tools for browser automation.""" import asyncio import base64 import logging from pathlib import Path from typing import Any, Optional, Union from playwright.async_api import Page logger = logging.getLogger(__name__) class PlaywrightTools: """Collection of Playwright browser automation tools.""" async def navigate(self, page: Page, url: str) -> str: """Navigate to a URL.""" try: # Add timeout and better error handling response = await page.goto(url, wait_until="domcontentloaded", timeout=30000) if response is None: return f"Navigation to {url} failed: No response received" if response.status >= 400: return f"Navigation to {url} completed with status {response.status}" return f"Successfully navigated to: {url}" except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): return f"Navigation to {url} timed out: {error_msg}" elif "net::" in error_msg: return f"Network error navigating to {url}: {error_msg}" else: return f"Failed to navigate to {url}: {error_msg}" async def click(self, page: Page, selector: str, timeout: int = 5000) -> str: """Click on an element.""" try: # First wait for the element to be visible and enabled await page.wait_for_selector(selector, timeout=timeout, state="visible") # Check if element is clickable element = await page.query_selector(selector) if not element: return f"Element {selector} not found" # Scroll into view if needed await element.scroll_into_view_if_needed() # Perform the click await page.click(selector, timeout=timeout) return f"Successfully clicked element: {selector}" except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): return f"Timeout clicking element {selector}: element not clickable within {timeout}ms" elif "not visible" in error_msg.lower(): return f"Failed to click element {selector}: element is not visible" elif "intercept" in error_msg.lower(): return f"Failed to click element {selector}: element is intercepted by another element" else: return f"Failed to click element {selector}: {error_msg}" async def type_text(self, page: Page, selector: str, text: str, timeout: int = 5000) -> str: """Type text into an element.""" try: # Wait for element and ensure it's enabled await page.wait_for_selector(selector, timeout=timeout, state="visible") # Clear existing text first await page.fill(selector, "", timeout=timeout) # Type the new text await page.fill(selector, text, timeout=timeout) return f"Successfully typed text into element: {selector}" except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): return f"Timeout typing into element {selector}: element not found or not enabled within {timeout}ms" elif "not editable" in error_msg.lower(): return f"Failed to type into element {selector}: element is not editable" else: return f"Failed to type text into element {selector}: {error_msg}" async def screenshot(self, page: Page, path: Optional[str] = None, full_page: bool = False) -> str: """Take a screenshot of the page.""" try: if path: # Ensure directory exists path_obj = Path(path) path_obj.parent.mkdir(parents=True, exist_ok=True) await page.screenshot(path=path, full_page=full_page) return f"Screenshot saved to: {path}" else: # Return base64 encoded screenshot with size limit screenshot_bytes = await page.screenshot(full_page=full_page) # Check size and warn if too large size_mb = len(screenshot_bytes) / (1024 * 1024) if size_mb > 5: # Warn if screenshot is larger than 5MB return f"Screenshot taken (size: {size_mb:.1f}MB - consider saving to file for large screenshots)" screenshot_b64 = base64.b64encode(screenshot_bytes).decode() return f"Screenshot taken (base64, size: {size_mb:.1f}MB): {screenshot_b64[:100]}..." except Exception as e: return f"Failed to take screenshot: {str(e)}" async def get_text(self, page: Page, selector: str, timeout: int = 5000) -> str: """Get text content from an element.""" try: # First try to get a single element element = await page.wait_for_selector(selector, timeout=timeout) if element: text = await element.text_content() return text or "" return "" except Exception as e: # If single element fails, try to get multiple elements try: elements = await page.query_selector_all(selector) if elements: texts = [] for element in elements: text = await element.text_content() if text: texts.append(text.strip()) return " | ".join(texts) if texts else "" return f"Failed to get text from element {selector}: {str(e)}" except Exception: return f"Failed to get text from element {selector}: {str(e)}" async def wait_for_selector(self, page: Page, selector: str, timeout: int = 30000, state: str = "visible") -> str: """Wait for an element to appear.""" valid_states = ["visible", "hidden", "attached", "detached"] if state not in valid_states: return f"Invalid state '{state}'. Must be one of: {', '.join(valid_states)}" try: await page.wait_for_selector(selector, timeout=timeout, state=state) return f"Element {selector} is now {state}" except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): return f"Timeout waiting for element {selector} to be {state} (waited {timeout}ms)" else: return f"Failed to wait for element {selector}: {error_msg}" async def evaluate(self, page: Page, script: str) -> str: """Evaluate JavaScript in the browser.""" try: result = await page.evaluate(script) return str(result) except Exception as e: return f"Failed to evaluate script: {str(e)}" async def get_title(self, page: Page) -> str: """Get the title of the current page.""" try: title = await page.title() return title except Exception as e: return f"Failed to get page title: {str(e)}" async def get_url(self, page: Page) -> str: """Get the URL of the current page.""" try: url = page.url return url except Exception as e: return f"Failed to get page URL: {str(e)}" async def select_option(self, page: Page, selector: str, value: str, timeout: int = 5000) -> str: """Select an option from a dropdown.""" try: await page.select_option(selector, value, timeout=timeout) return f"Successfully selected option '{value}' in element: {selector}" except Exception as e: return f"Failed to select option in element {selector}: {str(e)}" async def check_checkbox(self, page: Page, selector: str, timeout: int = 5000) -> str: """Check a checkbox.""" try: await page.check(selector, timeout=timeout) return f"Successfully checked checkbox: {selector}" except Exception as e: return f"Failed to check checkbox {selector}: {str(e)}" async def uncheck_checkbox(self, page: Page, selector: str, timeout: int = 5000) -> str: """Uncheck a checkbox.""" try: await page.uncheck(selector, timeout=timeout) return f"Successfully unchecked checkbox: {selector}" except Exception as e: return f"Failed to uncheck checkbox {selector}: {str(e)}" async def hover(self, page: Page, selector: str, timeout: int = 5000) -> str: """Hover over an element.""" try: await page.hover(selector, timeout=timeout) return f"Successfully hovered over element: {selector}" except Exception as e: return f"Failed to hover over element {selector}: {str(e)}" async def scroll_to(self, page: Page, selector: str, timeout: int = 5000) -> str: """Scroll to an element.""" try: element = await page.wait_for_selector(selector, timeout=timeout) if element: await element.scroll_into_view_if_needed() return f"Successfully scrolled to element: {selector}" return f"Element not found: {selector}" except Exception as e: return f"Failed to scroll to element {selector}: {str(e)}" async def get_attribute(self, page: Page, selector: str, attribute: str, timeout: int = 5000) -> str: """Get an attribute value from an element.""" try: element = await page.wait_for_selector(selector, timeout=timeout) if element: value = await element.get_attribute(attribute) return value or "" return "" except Exception as e: return f"Failed to get attribute {attribute} from element {selector}: {str(e)}" async def wait_for_load_state(self, page: Page, state: str = "load") -> str: """Wait for a specific load state.""" valid_states = ["load", "domcontentloaded", "networkidle"] if state not in valid_states: return f"Invalid load state '{state}'. Must be one of: {', '.join(valid_states)}" try: await page.wait_for_load_state(state) return f"Page reached load state: {state}" except Exception as e: return f"Failed to wait for load state {state}: {str(e)}" async def go_back(self, page: Page) -> str: """Go back in browser history.""" try: await page.go_back() return "Successfully navigated back" except Exception as e: return f"Failed to go back: {str(e)}" async def go_forward(self, page: Page) -> str: """Go forward in browser history.""" try: await page.go_forward() return "Successfully navigated forward" except Exception as e: return f"Failed to go forward: {str(e)}" async def reload(self, page: Page) -> str: """Reload the current page.""" try: await page.reload() return "Successfully reloaded page" except Exception as e: return f"Failed to reload page: {str(e)}" async def fill_form(self, page: Page, form_data: dict[str, str], submit_selector: Optional[str] = None) -> str: """Fill multiple form fields and optionally submit the form.""" results = [] failed_fields = [] for selector, value in form_data.items(): try: await page.fill(selector, value, timeout=5000) results.append(f"✓ {selector}: filled") except Exception as e: failed_fields.append(f"✗ {selector}: {str(e)}") if submit_selector: try: await page.click(submit_selector, timeout=5000) results.append(f"✓ Form submitted via {submit_selector}") except Exception as e: results.append(f"✗ Failed to submit form: {str(e)}") summary = f"Filled {len(results) - len(failed_fields)}/{len(form_data)} fields" if failed_fields: summary += f" ({len(failed_fields)} failed)" return f"{summary}\n" + "\n".join(results + failed_fields) async def get_links(self, page: Page, limit: int = 50) -> str: """Get all links on the current page.""" try: links = await page.evaluate(f""" () => {{ const links = Array.from(document.querySelectorAll('a[href]')); return links.slice(0, {limit}).map(link => ({{ text: link.textContent.trim() || '[No text]', href: link.href, target: link.target || '_self' }})); }} """) if not links: return "No links found on the page" result_lines = [f"Found {len(links)} links:"] for i, link in enumerate(links, 1): result_lines.append(f"{i}. {link['text']} -> {link['href']}") if len(links) == limit: result_lines.append(f"(Limited to first {limit} links)") return "\n".join(result_lines) except Exception as e: return f"Failed to get links: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nolecram/Build_MCP_Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server