Skip to main content
Glama

Playwright MCP

server.py17.4 kB
import asyncio import base64 from io import BytesIO from typing import Dict, List, Optional, Any, Union from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server from pydantic import AnyUrl import mcp.server.stdio from playwright.async_api import async_playwright, Browser, Page, BrowserContext # Global Playwright state playwright_instance = None browser: Optional[Browser] = None context: Optional[BrowserContext] = None page: Optional[Page] = None pages: Dict[str, Page] = {} current_page_id: Optional[str] = None server = Server("playwright-mcp") @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """ List available browser screenshot resources. """ resources = [] if pages: for page_id, page in pages.items(): resources.append( types.Resource( uri=AnyUrl(f"screenshot://{page_id}"), name=f"Screenshot: {page.url}", description=f"Current screenshot of page at {page.url}", mimeType="image/png", ) ) return resources @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> bytes: """ Capture and return a screenshot from the requested page. """ if uri.scheme != "screenshot": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") page_id = uri.host if page_id not in pages: raise ValueError(f"Page not found: {page_id}") # Take a screenshot of the page screenshot = await pages[page_id].screenshot() return screenshot @server.list_resource_templates() async def handle_list_resource_templates() -> list[types.ResourceTemplate]: """ List available resource templates for browser automation. """ return [] async def ensure_browser(): """Ensure the browser is launched and ready.""" global playwright_instance, browser, context, page, current_page_id if playwright_instance is None: playwright_instance = await async_playwright().start() browser = await playwright_instance.chromium.launch(headless=False) context = await browser.new_context() page = await context.new_page() pages["default"] = page current_page_id = "default" @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available Playwright browser automation tools. """ return [ types.Tool( name="navigate", description="Navigate to a URL", inputSchema={ "type": "object", "properties": { "url": {"type": "string"}, "page_id": {"type": "string"}, }, "required": ["url"], }, ), types.Tool( name="click", description="Click on an element by selector", inputSchema={ "type": "object", "properties": { "selector": {"type": "string"}, "page_id": {"type": "string"}, }, "required": ["selector"], }, ), types.Tool( name="type", description="Type text into an input element", inputSchema={ "type": "object", "properties": { "selector": {"type": "string"}, "text": {"type": "string"}, "page_id": {"type": "string"}, }, "required": ["selector", "text"], }, ), types.Tool( name="get_text", description="Get text content from an element", inputSchema={ "type": "object", "properties": { "selector": {"type": "string"}, "page_id": {"type": "string"}, }, "required": ["selector"], }, ), types.Tool( name="get_page_content", description="Get the current page HTML content", inputSchema={ "type": "object", "properties": { "page_id": {"type": "string"}, }, }, ), types.Tool( name="take_screenshot", description="Take a screenshot of the current page", inputSchema={ "type": "object", "properties": { "page_id": {"type": "string"}, "selector": {"type": "string"}, }, }, ), types.Tool( name="new_page", description="Create a new browser page", inputSchema={ "type": "object", "properties": { "page_id": {"type": "string"}, }, "required": ["page_id"], }, ), types.Tool( name="switch_page", description="Switch to a different browser page", inputSchema={ "type": "object", "properties": { "page_id": {"type": "string"}, }, "required": ["page_id"], }, ), types.Tool( name="get_pages", description="List all available browser pages", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="wait_for_selector", description="Wait for an element to be visible on the page", inputSchema={ "type": "object", "properties": { "selector": {"type": "string"}, "page_id": {"type": "string"}, "timeout": {"type": "number"}, }, "required": ["selector"], }, ), ] def get_active_page(page_id: Optional[str] = None) -> Page: """Get the active page based on page_id or current default.""" global current_page_id if page_id is None: page_id = current_page_id if page_id not in pages: raise ValueError(f"Page not found: {page_id}") return pages[page_id] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests for Playwright browser automation. """ global current_page_id if not arguments: arguments = {} # Ensure browser is initialized await ensure_browser() # Process tool calls based on name if name == "navigate": url = arguments.get("url") if not url: raise ValueError("URL is required") page = get_active_page(arguments.get("page_id")) await page.goto(url) return [types.TextContent(type="text", text=f"Navigated to {url}")] elif name == "click": selector = arguments.get("selector") if not selector: raise ValueError("Selector is required") page = get_active_page(arguments.get("page_id")) await page.click(selector) return [types.TextContent(type="text", text=f"Clicked element at selector: {selector}")] elif name == "type": selector = arguments.get("selector") text = arguments.get("text") if not selector or text is None: raise ValueError("Selector and text are required") page = get_active_page(arguments.get("page_id")) await page.fill(selector, text) return [types.TextContent(type="text", text=f"Typed '{text}' into {selector}")] elif name == "get_text": selector = arguments.get("selector") if not selector: raise ValueError("Selector is required") page = get_active_page(arguments.get("page_id")) text = await page.text_content(selector) return [types.TextContent(type="text", text=text or "")] elif name == "get_page_content": page = get_active_page(arguments.get("page_id")) content = await page.content() return [types.TextContent(type="text", text=content)] elif name == "take_screenshot": page = get_active_page(arguments.get("page_id")) selector = arguments.get("selector") if selector: screenshot = await page.locator(selector).screenshot() else: screenshot = await page.screenshot() # Convert the bytes to base64 base64_image = base64.b64encode(screenshot).decode('utf-8') # Return as ImageContent return [types.ImageContent( type="image", image=types.ImageData( mime_type="image/png", data=base64_image ) )] elif name == "new_page": page_id = arguments.get("page_id") if not page_id: raise ValueError("Page ID is required") if page_id in pages: raise ValueError(f"Page ID '{page_id}' already exists") new_page = await context.new_page() pages[page_id] = new_page current_page_id = page_id return [types.TextContent(type="text", text=f"Created new page with ID: {page_id}")] elif name == "switch_page": page_id = arguments.get("page_id") if not page_id: raise ValueError("Page ID is required") if page_id not in pages: raise ValueError(f"Page ID '{page_id}' not found") current_page_id = page_id return [types.TextContent(type="text", text=f"Switched to page: {page_id}")] elif name == "get_pages": page_info = [] for page_id, page in pages.items(): page_info.append(f"{page_id}: {page.url}") return [types.TextContent(type="text", text="Available pages:\n" + "\n".join(page_info))] elif name == "wait_for_selector": selector = arguments.get("selector") if not selector: raise ValueError("Selector is required") timeout = arguments.get("timeout", 30000) # Default 30 seconds page = get_active_page(arguments.get("page_id")) try: await page.wait_for_selector(selector, timeout=timeout) return [types.TextContent(type="text", text=f"Element found: {selector}")] except Exception as e: return [types.TextContent(type="text", text=f"Timeout waiting for element: {selector}")] else: raise ValueError(f"Unknown tool: {name}") # Notify clients that resources may have changed await server.request_context.session.send_resource_list_changed() @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: """ List available prompts for browser automation. """ return [ types.Prompt( name="interpret-page", description="Interpret the current web page content and structure", arguments=[ types.PromptArgument( name="page_id", description="ID of the page to interpret", required=False, ), types.PromptArgument( name="focus", description="What aspect to focus on (full, forms, navigation, text)", required=False, ) ], ) ] @server.get_prompt() async def handle_get_prompt( name: str, arguments: dict[str, str] | None ) -> types.GetPromptResult: """ Generate prompts for interpreting page content. """ global current_page_id if name != "interpret-page": raise ValueError(f"Unknown prompt: {name}") arguments = arguments or {} page_id = arguments.get("page_id", current_page_id) focus = arguments.get("focus", "full") if page_id not in pages: raise ValueError(f"Page ID '{page_id}' not found") page = pages[page_id] url = page.url # Get page title title = await page.title() # Get page content content = await page.content() # Take screenshot for visual context screenshot = await page.screenshot() screenshot_base64 = base64.b64encode(screenshot).decode('utf-8') # Build the prompt content based on focus prompt_text = f"Analyze this web page at URL: {url}\nTitle: {title}\n\n" if focus == "forms": prompt_text += "Focus on identifying all input forms, their fields, and how to interact with them.\n\n" # Extract form details form_info = await page.evaluate("""() => { const forms = Array.from(document.querySelectorAll('form')); return forms.map(form => { const inputs = Array.from(form.querySelectorAll('input, select, textarea, button')); return { id: form.id, action: form.action, method: form.method, inputs: inputs.map(input => ({ type: input.tagName.toLowerCase() === 'input' ? input.type : input.tagName.toLowerCase(), name: input.name, id: input.id, placeholder: input.placeholder || '', required: input.required || false })) }; }); }""") prompt_text += f"Form information: {form_info}\n\n" elif focus == "navigation": prompt_text += "Focus on identifying navigation elements, links, and page structure.\n\n" # Extract navigation and link details nav_info = await page.evaluate("""() => { const navs = Array.from(document.querySelectorAll('nav, header, [role="navigation"]')); const links = Array.from(document.querySelectorAll('a[href]')).slice(0, 20); return { navs: navs.map(nav => ({ id: nav.id, class: nav.className, links: Array.from(nav.querySelectorAll('a[href]')).map(a => ({ text: a.textContent.trim(), href: a.href })) })), topLinks: links.map(a => ({ text: a.textContent.trim(), href: a.href })) }; }""") prompt_text += f"Navigation information: {nav_info}\n\n" elif focus == "text": prompt_text += "Focus on extracting and summarizing the main text content.\n\n" # Extract main text content text_content = await page.evaluate("""() => { const paragraphs = Array.from(document.querySelectorAll('p, h1, h2, h3, h4, h5, h6, article')); return paragraphs.map(p => p.textContent.trim()).filter(t => t.length > 0).join('\\n'); }""") prompt_text += f"Main text content:\n{text_content}\n\n" else: # "full" prompt_text += "Provide a complete analysis of the page elements, content, and functionality.\n\n" prompt_text += "Key elements to identify:\n- Main content\n- Navigation\n- Forms\n- Interactive elements\n- Key information\n\n" return types.GetPromptResult( description=f"Interpret web page at {url}", messages=[ types.PromptMessage( role="user", content=[ types.TextContent(type="text", text=prompt_text), types.ImageContent( type="image", image=types.ImageData( mime_type="image/png", data=screenshot_base64 ) ) ] ) ], ) async def cleanup(): """Clean up Playwright resources.""" global browser, playwright_instance if browser: await browser.close() if playwright_instance: await playwright_instance.stop() async def main(): """Main entry point for the server.""" try: # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="playwright-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) finally: # Ensure we clean up resources when the server shuts down await cleanup()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/misanthropic-ai/playwrite-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server