Skip to main content
Glama

MCP Operator

by willer
browser.py36.3 kB
#!/usr/bin/env python3 """ Browser operator implementation for MCP """ import os import sys import json import asyncio import uuid import base64 from typing import Dict, Any, Optional, List, Union from datetime import datetime from pathlib import Path from urllib.parse import urlparse import logging # Set up logging (file only, no stdout to preserve MCP protocol) log_dir = Path(os.environ.get("MCP_LOG_DIR", "logs")) log_dir.mkdir(exist_ok=True) log_file = log_dir / f"mcp_operator_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file), # No stream handler to avoid interfering with MCP JSON-RPC ] ) logger = logging.getLogger("mcp-operator") # Import CUA components from mcp_operator.cua.agent import Agent from mcp_operator.cua.computer import AsyncLocalPlaywrightComputer class BrowserInstance: """Manages a single browser instance""" def __init__(self, project_name: str): """Initialize browser instance Args: project_name: Unique identifier for this browser instance """ self.project_name = project_name self.headless = True # Default to headless mode self.dimensions = (1280, 1024) # Default browser dimensions self.browser = None self.context = None self.page = None self.playwright = None self.playwright_context = None self.initialized = False logger.info(f"Browser instance created for project: {project_name}") async def initialize(self): """Initialize the browser using Playwright""" width, height = self.dimensions from playwright.async_api import async_playwright self.playwright_context = async_playwright() self.playwright = await self.playwright_context.__aenter__() # Configure browser launch options browser_options = { "headless": self.headless, "args": [ f"--window-size={width},{height}", "--disable-extensions", "--disable-web-security", "--disable-infobars", "--disable-notifications" ] } logger.info(f"Launching browser with options: {browser_options}") self.browser = await self.playwright.chromium.launch(**browser_options) # Create a context with specified viewport dimensions self.context = await self.browser.new_context( viewport={"width": width, "height": height}, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" ) # Set up the domain filter to protect against malicious websites async def handle_route(route, request): url = request.url hostname = urlparse(url).hostname # Block known harmful domains blocked_domains = ["evil.com", "malware.org", "phishing.com"] if hostname and any(hostname.endswith(domain) for domain in blocked_domains): logger.warning(f"Blocked access to harmful site: {url}") await route.abort() else: await route.continue_() await self.context.route("**/*", handle_route) # Create the page self.page = await self.context.new_page() await self.page.goto("about:blank") self.initialized = True logger.info(f"Browser initialized for project: {self.project_name}") async def close(self): """Close the browser and cleanup resources""" if self.page: try: await self.page.close() except Exception as e: logger.error(f"Error closing page: {e}") if self.context: try: await self.context.close() except Exception as e: logger.error(f"Error closing context: {e}") if self.browser: try: await self.browser.close() except Exception as e: logger.error(f"Error closing browser: {e}") if self.playwright_context: try: await self.playwright_context.__aexit__(None, None, None) except Exception as e: logger.error(f"Error stopping playwright: {e}") self.page = None self.context = None self.browser = None self.playwright = None self.playwright_context = None self.initialized = False logger.info(f"Browser closed for project: {self.project_name}") class Job: """Represents a browser operation job""" def __init__(self, job_id: str, project_name: str, operation: str, **kwargs): """Initialize a job Args: job_id: Unique identifier for this job project_name: The project this job belongs to operation: Type of operation (create, navigate, operate, close) **kwargs: Additional job parameters """ self.job_id = job_id self.project_name = project_name self.operation = operation self.params = kwargs self.status = "pending" self.result = None self.error = None self.created_at = datetime.now().isoformat() self.completed_at = None logger.info(f"Job created: {job_id} - {operation} for project {project_name}") def to_dict(self) -> Dict[str, Any]: """Convert job to dictionary representation Returns: Dict containing job details """ return { "job_id": self.job_id, "project_name": self.project_name, "operation": self.operation, "params": self.params, "status": self.status, "result": self.result, "error": self.error, "created_at": self.created_at, "completed_at": self.completed_at } def complete(self, result: Any = None): """Mark the job as completed with results Args: result: The result data from the operation """ self.status = "completed" self.result = result self.completed_at = datetime.now().isoformat() logger.info(f"Job completed: {self.job_id}") def fail(self, error: str): """Mark the job as failed with error message Args: error: Error message describing the failure """ self.status = "failed" self.error = error self.completed_at = datetime.now().isoformat() logger.error(f"Job failed: {self.job_id} - {error}") class BrowserOperator: """Manages browser automation through MCP""" def __init__(self, project_name: Optional[str] = None): """Initialize the browser operator Args: project_name: Optional project name, will be auto-generated if not provided """ self.project_name = project_name or f"browser-{uuid.uuid4().hex[:8]}" self.browser_instance = None self.agent = None self.jobs: Dict[str, Job] = {} self.allow_domains = [ "about:blank", "google.com", "www.google.com", "github.com", "www.github.com", "example.com", "www.example.com", "wikipedia.org", "www.wikipedia.org", "cnn.com", "www.cnn.com", "openai.com", "www.openai.com", "anthropic.com", "www.anthropic.com" ] logger.info(f"Browser operator initialized with project name: {self.project_name}") def _generate_job_id(self) -> str: """Generate a unique job ID Returns: Unique job ID string """ return f"job-{uuid.uuid4().hex}" async def create_browser(self) -> Dict[str, Any]: """Create a new browser instance Returns: Dict with job information """ job_id = self._generate_job_id() job = Job(job_id, self.project_name, "create") self.jobs[job_id] = job try: # Check if browser already exists if self.browser_instance: await self.close() # Create and initialize the browser self.browser_instance = BrowserInstance(self.project_name) await self.browser_instance.initialize() # Complete the job successfully job.complete({"project_name": self.project_name}) except Exception as e: logger.exception("Error creating browser") job.fail(str(e)) return {"job_id": job_id} async def navigate_browser(self, url: str) -> Dict[str, Any]: """Navigate the browser to a URL Args: url: URL to navigate to Returns: Dict with job information """ job_id = self._generate_job_id() job = Job(job_id, self.project_name, "navigate", url=url) self.jobs[job_id] = job try: # Ensure browser is initialized if not self.browser_instance or not self.browser_instance.initialized: raise ValueError("Browser not initialized. Call create_browser first.") # Check URL safety parsed_url = urlparse(url) hostname = parsed_url.hostname # Validate URL if not parsed_url.scheme or not hostname: raise ValueError(f"Invalid URL: {url}") # Check if domain is allowed is_allowed = any(hostname.endswith(domain) for domain in self.allow_domains) if not is_allowed: raise ValueError(f"Domain not allowed: {hostname}") # Navigate to URL logger.info(f"Navigating to URL: {url}") await self.browser_instance.page.goto(url, wait_until="domcontentloaded") # Take screenshot after navigation screenshot = await self.browser_instance.page.screenshot() screenshot_base64 = base64.b64encode(screenshot).decode("utf-8") # Complete the job successfully job.complete({ "current_url": self.browser_instance.page.url, "screenshot": screenshot_base64 }) except Exception as e: logger.exception(f"Error navigating to {url}") job.fail(str(e)) return {"job_id": job_id} async def operate_browser(self, instruction: str) -> Dict[str, Any]: """Operate the browser based on a natural language instruction Args: instruction: Natural language instruction to execute Returns: Dict with job information """ job_id = self._generate_job_id() job = Job(job_id, self.project_name, "operate", instruction=instruction) self.jobs[job_id] = job try: # Ensure browser is initialized if not self.browser_instance or not self.browser_instance.initialized: raise ValueError("Browser not initialized. Call create_browser first.") # Process the instruction using CUA result = await self.process_message(instruction) # Complete the job successfully job.complete(result) except Exception as e: logger.exception(f"Error operating browser with instruction: {instruction}") job.fail(str(e)) return {"job_id": job_id} async def process_message(self, instruction: str) -> Dict[str, Any]: """Process a natural language instruction using CUA Args: instruction: The instruction to process Returns: Dict with results of the operation """ logger.info(f"Processing instruction: {instruction}") # Initialize CUA agent if not already done if not self.agent: # Create computer instance that will communicate with our browser logger.info(f"Creating computer instance with headless={self.browser_instance.headless}") computer = AsyncLocalPlaywrightComputer( headless=self.browser_instance.headless, width=self.browser_instance.dimensions[0], height=self.browser_instance.dimensions[1], allowed_domains=self.allow_domains ) # Create the agent self.agent = Agent( model="computer-use-preview", # Use the specialized computer use model computer=computer, allowed_domains=self.allow_domains ) logger.info("CUA agent initialized") # Run the agent to perform the instruction try: logger.info("Running CUA agent") result = await self.agent.run(instruction, max_steps=20) logger.info(f"Agent completed with success={result.success}") # Take final screenshot screenshot = await self.browser_instance.page.screenshot() screenshot_base64 = base64.b64encode(screenshot).decode("utf-8") # Create GIF from screen captures if available gif_path = None if result.screen_captures and len(result.screen_captures) > 0: try: gif_dir = Path("./screenshots") gif_dir.mkdir(exist_ok=True) gif_path = str(gif_dir / f"{self.project_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.gif") self.agent.create_gif(gif_path) except Exception as e: logger.error(f"Error creating GIF: {e}") # Get current URL current_url = self.browser_instance.page.url # Get console logs console_logs = await self.get_console_logs() # Return results return { "success": result.success, "text": result.message, "screenshot": screenshot_base64, "current_url": current_url, "console_logs": console_logs, "gif_path": gif_path, "actions_executed": len(result.screen_captures) if hasattr(result, "screen_captures") else 0 } except Exception as e: logger.exception("Error in CUA agent execution") raise async def close(self) -> Dict[str, Any]: """Close the browser instance Returns: Dict with job information """ job_id = self._generate_job_id() job = Job(job_id, self.project_name, "close") self.jobs[job_id] = job try: if self.browser_instance: await self.browser_instance.close() self.browser_instance = None # Reset agent self.agent = None # Complete the job successfully job.complete({"project_name": self.project_name, "status": "closed"}) except Exception as e: logger.exception("Error closing browser") job.fail(str(e)) return {"job_id": job_id} def get_job_status(self, job_id: str) -> Dict[str, Any]: """Get the status of a job Args: job_id: ID of the job to check Returns: Dict with job status information """ if job_id not in self.jobs: return {"error": f"Job not found: {job_id}"} return self.jobs[job_id].to_dict() def list_jobs(self, limit: int = 10) -> List[Dict[str, Any]]: """List recent jobs Args: limit: Maximum number of jobs to return Returns: List of job dictionaries """ # Sort jobs by creation time (newest first) and limit sorted_jobs = sorted( self.jobs.values(), key=lambda job: job.created_at, reverse=True )[:limit] return [job.to_dict() for job in sorted_jobs] async def add_note(self, name: str, content: str) -> Dict[str, Any]: """Add a user note Args: name: Name/title of the note content: Content of the note Returns: Dict with job information """ job_id = self._generate_job_id() job = Job(job_id, self.project_name, "add_note", name=name, content=content) self.jobs[job_id] = job try: # Save note to file notes_dir = Path("./notes") notes_dir.mkdir(exist_ok=True) note_file = notes_dir / f"{self.project_name}_{name.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(note_file, "w") as f: f.write(f"Title: {name}\n") f.write(f"Date: {datetime.now().isoformat()}\n") f.write(f"Project: {self.project_name}\n") f.write("-" * 40 + "\n") f.write(content) # Complete the job successfully job.complete({"note_file": str(note_file)}) except Exception as e: logger.exception(f"Error adding note: {name}") job.fail(str(e)) return {"job_id": job_id} # Browser debugging tools async def get_console_logs(self) -> List[Dict[str, Any]]: """Get browser console logs Returns: List of console log entries """ if not self.browser_instance or not self.browser_instance.initialized: return [{"error": "Browser not initialized"}] # Create a list to store logs logs = [] # Set up a listener to capture logs if not already set up try: # Using playwright's console API page = self.browser_instance.page # Collect logs using evaluate result = await page.evaluate(""" () => { return window.console_logs || []; } """) if result: logs.extend(result) return logs except Exception as e: logger.exception("Error getting console logs") return [{"error": str(e)}] async def get_console_errors(self) -> List[Dict[str, Any]]: """Get browser console errors Returns: List of console error entries """ # Get all logs and filter for errors logs = await self.get_console_logs() # Filter for error logs only return [log for log in logs if log.get("type") == "error"] async def get_network_logs(self) -> List[Dict[str, Any]]: """Get browser network logs Returns: List of network log entries """ if not self.browser_instance or not self.browser_instance.initialized: return [{"error": "Browser not initialized"}] try: # Using playwright to get network logs page = self.browser_instance.page # Collect network logs using evaluate result = await page.evaluate(""" () => { return window.network_logs || []; } """) if result: return result return [] except Exception as e: logger.exception("Error getting network logs") return [{"error": str(e)}] async def get_network_errors(self) -> List[Dict[str, Any]]: """Get browser network errors Returns: List of network error entries """ # Get all network logs and filter for errors logs = await self.get_network_logs() # Filter for error logs only return [log for log in logs if log.get("status") >= 400] async def take_screenshot(self) -> Dict[str, Any]: """Take a screenshot of the current page Returns: Dict with screenshot data """ if not self.browser_instance or not self.browser_instance.initialized: return {"error": "Browser not initialized"} try: # Take screenshot screenshot = await self.browser_instance.page.screenshot() screenshot_base64 = base64.b64encode(screenshot).decode("utf-8") return {"screenshot": screenshot_base64} except Exception as e: logger.exception("Error taking screenshot") return {"error": str(e)} async def get_selected_element(self) -> Dict[str, Any]: """Get information about the currently selected element Returns: Dict with element information """ if not self.browser_instance or not self.browser_instance.initialized: return {"error": "Browser not initialized"} try: # Using playwright to get selected element page = self.browser_instance.page # Get information about currently focused element element_info = await page.evaluate(""" () => { const activeElement = document.activeElement; if (!activeElement || activeElement === document.body) { return { found: false }; } const rect = activeElement.getBoundingClientRect(); return { found: true, tag: activeElement.tagName.toLowerCase(), id: activeElement.id, className: activeElement.className, text: activeElement.textContent?.trim().substring(0, 100) || "", attributes: Array.from(activeElement.attributes).map(attr => ({ name: attr.name, value: attr.value })), position: { x: rect.left, y: rect.top, width: rect.width, height: rect.height } }; } """) return element_info except Exception as e: logger.exception("Error getting selected element") return {"error": str(e)} async def wipe_logs(self) -> Dict[str, str]: """Wipe browser logs from memory Returns: Dict with status message """ if not self.browser_instance or not self.browser_instance.initialized: return {"error": "Browser not initialized"} try: # Clear logs using evaluate await self.browser_instance.page.evaluate(""" () => { window.console_logs = []; window.network_logs = []; console.log("Logs wiped"); } """) return {"status": "Logs wiped successfully"} except Exception as e: logger.exception("Error wiping logs") return {"error": str(e)} # Audit tools async def _run_audit(self, audit_type: str) -> Dict[str, Any]: """Run a generic audit on the current page Args: audit_type: Type of audit to run Returns: Dict with audit results """ if not self.browser_instance or not self.browser_instance.initialized: return {"error": "Browser not initialized"} try: # Using a simplified audit mechanism page = self.browser_instance.page # Run appropriate audit based on type audit_script = f""" () => {{ // Simple audit implementation const results = {{}}; // Common function to check meta tags const checkMetaTags = () => {{ const metas = document.querySelectorAll('meta'); const metaInfo = Array.from(metas).map(meta => {{ return {{ name: meta.getAttribute('name'), property: meta.getAttribute('property'), content: meta.getAttribute('content') }}; }}); return metaInfo; }}; // Check basic page metrics const getBasicMetrics = () => {{ return {{ title: document.title, url: window.location.href, loadTime: performance.now(), docType: document.doctype ? document.doctype.name : 'unknown', elementsCount: document.getElementsByTagName('*').length }}; }}; results.basicMetrics = getBasicMetrics(); // Specific audit logic based on type if ('{audit_type}' === 'accessibility') {{ // Basic accessibility checks const imgWithoutAlt = document.querySelectorAll('img:not([alt])').length; const formsWithoutLabels = document.querySelectorAll('input:not([id])').length; const headingLevelsSkipped = (function() {{ const headings = document.querySelectorAll('h1, h2, h3, h4, h5, h6'); const levels = new Set(); for (const heading of headings) {{ levels.add(parseInt(heading.tagName[1])); }} const ordered = Array.from(levels).sort(); let skipped = false; for (let i = 1; i < ordered.length; i++) {{ if (ordered[i] - ordered[i-1] > 1) {{ skipped = true; break; }} }} return skipped; }})(); results.accessibility = {{ imgWithoutAlt, formsWithoutLabels, headingLevelsSkipped, ariaUsage: document.querySelectorAll('[aria-*]').length, colorContrast: 'Manual check required' }}; }} if ('{audit_type}' === 'performance') {{ // Basic performance metrics const perfEntries = performance.getEntriesByType('navigation'); results.performance = perfEntries.length > 0 ? perfEntries[0] : {{ loadTime: performance.now(), resourceCount: performance.getEntriesByType('resource').length, scriptCount: document.querySelectorAll('script').length, stylesheetCount: document.querySelectorAll('link[rel="stylesheet"]').length, imageCount: document.querySelectorAll('img').length, totalBytes: 'Cannot calculate without browser API' }}; }} if ('{audit_type}' === 'seo') {{ // Basic SEO checks results.seo = {{ metaTags: checkMetaTags(), headings: {{ h1: document.querySelectorAll('h1').length, h2: document.querySelectorAll('h2').length, h3: document.querySelectorAll('h3').length }}, imgWithAlt: document.querySelectorAll('img[alt]').length, links: document.querySelectorAll('a').length, canonicalLink: document.querySelector('link[rel="canonical"]')?.href }}; }} if ('{audit_type}' === 'nextjs') {{ // Check for NextJS specific patterns const isNextJS = Boolean( document.querySelector('#__next') || document.querySelector('script#__NEXT_DATA__') ); results.nextjs = {{ isNextJS, nextRoot: Boolean(document.querySelector('#__next')), nextData: Boolean(document.querySelector('script#__NEXT_DATA__')), headManager: Boolean(document.querySelector('noscript#__next_css__DO_NOT_USE__')) }}; }} if ('{audit_type}' === 'bestPractices') {{ // Basic best practices checks results.bestPractices = {{ docType: document.doctype !== null, viewport: document.querySelector('meta[name="viewport"]') !== null, charset: document.querySelector('meta[charset]') !== null, consoleErrors: typeof window.console_logs === 'object' ? window.console_logs.filter(log => log.type === 'error').length : 'Console logs not captured', deprecatedHtml: document.querySelectorAll('center, font, frame, frameset, marquee').length, inlineStyles: document.querySelectorAll('[style]').length, inlineJS: document.querySelectorAll('*[onclick], *[onload], *[onsubmit]').length }}; }} if ('{audit_type}' === 'debugger') {{ // Collect debug information results.debugInfo = {{ dom: {{ bodyClasses: document.body.className, bodyId: document.body.id, elementCount: document.getElementsByTagName('*').length, scripts: Array.from(document.scripts).map(s => s.src).filter(Boolean), stylesheets: Array.from(document.styleSheets).length, iframes: document.querySelectorAll('iframe').length }}, environment: {{ userAgent: navigator.userAgent, language: navigator.language, screenSize: `${{window.innerWidth}}x${{window.innerHeight}}`, devicePixelRatio: window.devicePixelRatio, urlParams: Object.fromEntries(new URLSearchParams(window.location.search)) }} }}; }} if ('{audit_type}' === 'audit') {{ // Run all audits // Accessibility const imgWithoutAlt = document.querySelectorAll('img:not([alt])').length; const formsWithoutLabels = document.querySelectorAll('input:not([id])').length; results.accessibility = {{ imgWithoutAlt, formsWithoutLabels, ariaUsage: document.querySelectorAll('[aria-*]').length }}; // Performance const perfEntries = performance.getEntriesByType('navigation'); results.performance = perfEntries.length > 0 ? perfEntries[0] : {{ loadTime: performance.now(), resourceCount: performance.getEntriesByType('resource').length }}; // SEO results.seo = {{ metaTags: checkMetaTags(), headings: {{ h1: document.querySelectorAll('h1').length, h2: document.querySelectorAll('h2').length, h3: document.querySelectorAll('h3').length }} }}; // Best Practices results.bestPractices = {{ docType: document.doctype !== null, viewport: document.querySelector('meta[name="viewport"]') !== null, charset: document.querySelector('meta[charset]') !== null }}; }} return results; }} """ # Execute the audit script audit_results = await page.evaluate(audit_script) # Add timestamp audit_results["timestamp"] = datetime.now().isoformat() audit_results["url"] = page.url return audit_results except Exception as e: logger.exception(f"Error running {audit_type} audit") return {"error": str(e)} async def run_accessibility_audit(self) -> Dict[str, Any]: """Run an accessibility audit on the current page Returns: Dict with accessibility audit results """ return await self._run_audit("accessibility") async def run_performance_audit(self) -> Dict[str, Any]: """Run a performance audit on the current page Returns: Dict with performance audit results """ return await self._run_audit("performance") async def run_seo_audit(self) -> Dict[str, Any]: """Run an SEO audit on the current page Returns: Dict with SEO audit results """ return await self._run_audit("seo") async def run_nextjs_audit(self) -> Dict[str, Any]: """Run a NextJS-specific audit on the current page Returns: Dict with NextJS audit results """ return await self._run_audit("nextjs") async def run_best_practices_audit(self) -> Dict[str, Any]: """Run a best practices audit on the current page Returns: Dict with best practices audit results """ return await self._run_audit("bestPractices") async def run_debugger_mode(self) -> Dict[str, Any]: """Run debugger mode to collect diagnostic information Returns: Dict with debug information """ return await self._run_audit("debugger") async def run_audit_mode(self) -> Dict[str, Any]: """Run comprehensive audit mode Returns: Dict with comprehensive audit results """ return await self._run_audit("audit")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/willer/mcp-operator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server