MCP Operator

by willer
Verified
#!/usr/bin/env python3 """ Computer implementations for the OpenAI Computer Use Agent (CUA) """ import asyncio import base64 from typing import List, Dict, Tuple, Literal, Protocol, Any from urllib.parse import urlparse # Computer Protocol that defines the required methods for our CUA computer class AsyncComputer(Protocol): """Defines the methods and properties required for our CUA computer""" @property def environment(self) -> Literal["browser"]: ... @property def dimensions(self) -> Tuple[int, int]: ... async def screenshot(self) -> str: ... async def click(self, x: int, y: int, button: str = "left") -> None: ... async def double_click(self, x: int, y: int) -> None: ... async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: ... async def type(self, text: str) -> None: ... async def wait(self, ms: int = 1000) -> None: ... async def move(self, x: int, y: int) -> None: ... async def keypress(self, keys: List[str]) -> None: ... async def drag(self, path: List[Dict[str, int]]) -> None: ... async def get_current_url(self) -> str: ... async def goto(self, url: str) -> None: ... class AsyncPlaywrightComputer: """Base implementation for Playwright-based browser automation using the Async API""" environment: Literal["browser"] = "browser" dimensions = (1280, 1024) # Default dimensions for the browser def __init__(self, allowed_domains=None): self._playwright = None self._browser = None self._page = None self.allowed_domains = allowed_domains or ['about:blank'] async def __aenter__(self): from playwright.async_api import async_playwright # Start Playwright self._playwright = await async_playwright().start() self._browser, self._page = await self._get_browser_and_page() # Set up domain blocking based on allowed domains async def handle_route(route, request): url = request.url hostname = urlparse(url).hostname or "" # For important resources like stylesheets, scripts, fonts, and images, be more permissive resource_type = request.resource_type essential_resources = ["stylesheet", "script", "font", "image", "fetch", "xhr", "other"] # Check if it's allowed based on our domain list is_allowed = any(hostname.endswith(domain) for domain in self.allowed_domains) # Additional check for essential resources from CDNs cdn_domains = ["cdn", "jsdelivr", "cloudflare", "unpkg", "googleapis", "fontawesome"] is_cdn = any(cdn in hostname for cdn in cdn_domains) # Allow essential resources from known CDNs even if not explicitly in our domain list if resource_type in essential_resources and is_cdn: is_allowed = True # Special case for assets in the main application domain try: main_domain = urlparse(self._page.url).hostname if main_domain and hostname and hostname.endswith(main_domain): is_allowed = True except: # If we can't get the current URL, be more lenient pass if not is_allowed: # Only log and block non-essential resources to reduce noise if resource_type not in ["image", "stylesheet", "font"]: print(f"Blocking disallowed domain: {url}") await route.abort() else: await route.continue_() await self._page.route("**/*", handle_route) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._browser: await self._browser.close() if self._playwright: await self._playwright.stop() async def screenshot(self) -> str: """Capture a screenshot of the current page""" # Export as PNG png_bytes = await self._page.screenshot(full_page=False) # Convert to base64 for API return base64.b64encode(png_bytes).decode("utf-8") async def click(self, x: int, y: int, button: str = "left") -> None: """Click at the specified coordinates""" if button == "back": await self.back() elif button == "forward": await self.forward() elif button == "wheel": await self._page.mouse.wheel(x, y) else: button_mapping = {"left": "left", "right": "right", "middle": "middle"} button_type = button_mapping.get(button, "left") await self._page.mouse.click(x, y, button=button_type) async def double_click(self, x: int, y: int) -> None: """Double-click at the specified coordinates""" await self._page.mouse.dblclick(x, y) async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: """Scroll from a position""" await self._page.mouse.move(x, y) await self._page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})") async def type(self, text: str) -> None: """Type text with the keyboard""" await self._page.keyboard.type(text) async def wait(self, ms: int = 1000) -> None: """Wait for a specified time in milliseconds""" await asyncio.sleep(ms / 1000) async def move(self, x: int, y: int) -> None: """Move the mouse to the specified coordinates""" await self._page.mouse.move(x, y) async def keypress(self, keys: List[str]) -> None: """Press keyboard keys""" # Map common key names to Playwright key names key_mapping = { "CTRL": "Control", "CMD": "Meta", "ESC": "Escape", "ALT": "Alt", "SHIFT": "Shift", "TAB": "Tab", "ENTER": "Enter", "BACKSPACE": "Backspace", "DELETE": "Delete", "HOME": "Home", "END": "End", "PAGEUP": "PageUp", "PAGEDOWN": "PageDown", "ARROWUP": "ArrowUp", "ARROWDOWN": "ArrowDown", "ARROWLEFT": "ArrowLeft", "ARROWRIGHT": "ArrowRight", "SPACE": " " } for key in keys: # Convert key to lowercase for case-insensitive comparison key_lower = key.lower() # Use the mapping if available mapped_key = key_mapping.get(key.upper(), key) await self._page.keyboard.press(mapped_key) async def drag(self, path: List[Dict[str, int]]) -> None: """Perform a drag operation along a path""" if not path: return await self._page.mouse.move(path[0]["x"], path[0]["y"]) await self._page.mouse.down() for point in path[1:]: await self._page.mouse.move(point["x"], point["y"]) await self._page.mouse.up() async def get_current_url(self) -> str: """Get the current page URL""" return self._page.url async def goto(self, url: str) -> None: """Navigate to a URL""" try: await self._page.goto(url, wait_until="domcontentloaded", timeout=30000) except Exception as e: print(f"Error navigating to {url}: {e}") async def back(self) -> None: """Go back in browser history""" await self._page.go_back() async def forward(self) -> None: """Go forward in browser history""" await self._page.go_forward() async def _get_browser_and_page(self): """Set up browser and page - must be implemented by child classes""" raise NotImplementedError("Child classes must implement this method") class AsyncLocalPlaywrightComputer(AsyncPlaywrightComputer): """Implementation of a local Playwright browser using the Async API""" def __init__(self, headless: bool = True, width: int = 1280, height: int = 1024, allowed_domains=None): super().__init__(allowed_domains=allowed_domains) self.headless = headless self.dimensions = (width, height) async def _get_browser_and_page(self): """Create a local browser instance""" width, height = self.dimensions # Launch arguments launch_args = [ f"--window-size={width},{height}", "--disable-extensions", "--disable-web-security", # Allow cross-domain cookies "--allow-running-insecure-content", # Allow mixed content "--ignore-certificate-errors", # Ignore SSL errors ] # Launch the browser browser = await self._playwright.chromium.launch( headless=self.headless, args=launch_args ) # Create a new browser context with more permissive settings context = await browser.new_context( viewport={"width": width, "height": height}, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", ignore_https_errors=True, # Ignore HTTPS errors accept_downloads=True, # Accept downloads ) # Set up default permissions # Grant all permissions to make the browser work more like a human user's browser permissions = [ 'geolocation', 'notifications', 'camera', 'microphone', 'clipboard-read', 'clipboard-write' ] await context.grant_permissions(permissions) # Create a page page = await context.new_page() # Load blank page initially await page.goto("about:blank") return browser, page