"""
Browser automation tools for Percepta MCP server.
"""
import asyncio
import base64
from typing import Dict, Any, Optional, List, Literal, cast
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
import logging
from ..config import Settings
logger = logging.getLogger(__name__)
class BrowserAutomation:
"""Browser automation using Playwright."""
def __init__(self, settings: Settings):
self.settings = settings
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.playwright = None
async def _ensure_browser(self) -> None:
"""Ensure browser is initialized."""
if not self.browser:
self.playwright = await async_playwright().start()
# Use Chromium by default
self.browser = await self.playwright.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
self.context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
self.page = await self.context.new_page()
async def navigate(self, url: str, wait_for: str = "networkidle") -> Dict[str, Any]:
"""Navigate to a URL."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info(f"Navigating to: {url}")
response = await self.page.goto(url, wait_until="networkidle", timeout=30000)
return {
"success": True,
"url": self.page.url,
"title": await self.page.title(),
"status": response.status if response else None
}
except Exception as e:
logger.error(f"Navigation error: {e}")
return {
"success": False,
"error": str(e)
}
async def click(self, selector: str, timeout: int = 30000) -> Dict[str, Any]:
"""Click an element by selector."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info(f"Clicking element: {selector}")
await self.page.wait_for_selector(selector, timeout=timeout)
await self.page.click(selector)
return {
"success": True,
"selector": selector
}
except Exception as e:
logger.error(f"Click error: {e}")
return {
"success": False,
"error": str(e),
"selector": selector
}
async def fill(self, selector: str, text: str, timeout: int = 30000) -> Dict[str, Any]:
"""Fill a form field with text."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info(f"Filling element {selector} with text")
await self.page.wait_for_selector(selector, timeout=timeout)
await self.page.fill(selector, text)
return {
"success": True,
"selector": selector,
"text_length": len(text)
}
except Exception as e:
logger.error(f"Fill error: {e}")
return {
"success": False,
"error": str(e),
"selector": selector
}
async def screenshot(self, full_page: bool = False, quality: int = 80) -> Dict[str, Any]:
"""Take a screenshot of the current page."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info("Taking screenshot")
screenshot_bytes = await self.page.screenshot(
full_page=full_page,
type="png"
)
# Convert to base64 for transmission
screenshot_b64 = base64.b64encode(screenshot_bytes).decode('utf-8')
return {
"success": True,
"image": screenshot_b64,
"mime_type": "image/png",
"url": self.page.url,
"title": await self.page.title()
}
except Exception as e:
logger.error(f"Screenshot error: {e}")
return {
"success": False,
"error": str(e)
}
async def extract_text(self, selector: Optional[str] = None) -> Dict[str, Any]:
"""Extract text from the page or a specific element."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
if selector:
logger.info(f"Extracting text from element: {selector}")
element = await self.page.wait_for_selector(selector)
text = await element.text_content() if element else ""
else:
logger.info("Extracting all page text")
text = await self.page.text_content('body')
return {
"success": True,
"text": text or "",
"selector": selector,
"url": self.page.url
}
except Exception as e:
logger.error(f"Text extraction error: {e}")
return {
"success": False,
"error": str(e),
"selector": selector
}
async def wait_for_element(self, selector: str, timeout: int = 30000, state: str = "visible") -> Dict[str, Any]:
"""Wait for an element to appear."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info(f"Waiting for element: {selector}")
# Use default visible state to avoid type issues
await self.page.wait_for_selector(selector, timeout=timeout, state="visible")
return {
"success": True,
"selector": selector,
"state": "visible"
}
except Exception as e:
logger.error(f"Wait for element error: {e}")
return {
"success": False,
"error": str(e),
"selector": selector
}
async def evaluate_script(self, script: str) -> Dict[str, Any]:
"""Execute JavaScript on the page."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
logger.info("Executing JavaScript")
result = await self.page.evaluate(script)
return {
"success": True,
"result": result
}
except Exception as e:
logger.error(f"Script execution error: {e}")
return {
"success": False,
"error": str(e)
}
async def get_page_info(self) -> Dict[str, Any]:
"""Get current page information."""
try:
await self._ensure_browser()
if not self.page:
raise RuntimeError("Page not initialized")
return {
"success": True,
"url": self.page.url,
"title": await self.page.title(),
"viewport": self.page.viewport_size
}
except Exception as e:
logger.error(f"Get page info error: {e}")
return {
"success": False,
"error": str(e)
}
async def close(self) -> None:
"""Close browser and cleanup resources."""
try:
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
except Exception as e:
logger.error(f"Browser cleanup error: {e}")
finally:
self.page = None
self.context = None
self.browser = None
self.playwright = None