Puppeteer MCP Server

  • src
  • playwright_server
import asyncio from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server from pydantic import AnyUrl import mcp.server.stdio server = Server("playwright-server") @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """ List available note resources. Each note is exposed as a resource with a custom note:// URI scheme. """ return [] @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: """ Read a specific note's content by its URI. The note name is extracted from the URI host component. """ raise ValueError(f"Unsupported URI scheme: {uri.scheme}") @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: """ List available prompts. Each prompt can have optional arguments to customize its behavior. """ return [] @server.get_prompt() async def handle_get_prompt( name: str, arguments: dict[str, str] | None ) -> types.GetPromptResult: """ Generate a prompt by combining arguments with server state. The prompt includes all current notes and can be customized via arguments. """ raise ValueError(f"Unknown prompt: {name}") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return [ # types.Tool( # name="playwright_new_session", # description="Create a new browser session", # inputSchema={ # "type": "object", # "properties": { # "url": {"type": "string", "description": "Initial URL to navigate to"} # } # } # ), types.Tool( name="playwright_navigate", description="Navigate to a URL,thip op will auto create a session", inputSchema={ "type": "object", "properties": { "url": {"type": "string"} }, "required": ["url"] } ), types.Tool( name="playwright_screenshot", description="Take a screenshot of the current page or a specific element", inputSchema={ "type": "object", "properties": { "name": {"type": "string"}, "selector": {"type": "string", "description": "CSS selector for element to screenshot,null is full page"}, }, "required": ["name"] } ), types.Tool( name="playwright_click", description="Click an element on the page using CSS selector", inputSchema={ "type": "object", "properties": { "selector": {"type": "string", "description": "CSS selector for element to click"} }, "required": ["selector"] } ), types.Tool( name="playwright_fill", description="Fill out an input field", inputSchema={ "type": "object", "properties": { "selector": {"type": "string", "description": "CSS selector for input field"}, "value": {"type": "string", "description": "Value to fill"} }, "required": ["selector", "value"] } ), types.Tool( name="playwright_evaluate", description="Execute JavaScript in the browser console", inputSchema={ "type": "object", "properties": { "script": {"type": "string", "description": "JavaScript code to execute"} }, "required": ["script"] } ), types.Tool( name="playwright_click_text", description="Click an element on the page by its text content", inputSchema={ "type": "object", "properties": { "text": {"type": "string", "description": "Text content of the element to click"} }, "required": ["text"] } ), types.Tool( name="playwright_get_text_content", description="Get the text content of all elements", inputSchema={ "type": "object", "properties": { }, } ), types.Tool( name="playwright_get_html_content", description="Get the HTML content of the page", inputSchema={ "type": "object", "properties": { "selector": {"type": "string", "description": "CSS selector for the element"} }, "required": ["selector"] } ) ] import uuid from playwright.async_api import async_playwright import base64 import os import asyncio def update_page_after_click(func): async def wrapper(self, name: str, arguments: dict | None): if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] new_page_future = asyncio.ensure_future(page.context.wait_for_event("page", timeout=3000)) result = await func(self, name, arguments) try: new_page = await new_page_future await new_page.wait_for_load_state() self._sessions[session_id]["page"] = new_page except: pass # if page.url != self._sessions[session_id]["page"].url: # await page.wait_for_load_state() # self._sessions[session_id]["page"] = page return result return wrapper class ToolHandler: _sessions: dict[str, any] = {} _playwright: any = None async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: raise NotImplementedError class NewSessionToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: self._playwright = await async_playwright().start() browser = await self._playwright.chromium.launch(headless=False) page = await browser.new_page() session_id = str(uuid.uuid4()) self._sessions[session_id] = {"browser": browser, "page": page} url = arguments.get("url") if url: if not url.startswith("http://") and not url.startswith("https://"): url = "https://" + url await page.goto(url) return [types.TextContent(type="text", text="succ")] class NavigateToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: await NewSessionToolHandler().handle("",{}) # return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] url = arguments.get("url") if not url.startswith("http://") and not url.startswith("https://"): url = "https://" + url await page.goto(url) text_content=await GetTextContentToolHandler().handle("",{}) return [types.TextContent(type="text", text=f"Navigated to {url}\npage_text_content[:200]:\n\n{text_content[:200]}")] class ScreenshotToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] name = arguments.get("name") selector = arguments.get("selector") # full_page = arguments.get("fullPage", False) if selector: element = await page.locator(selector) await element.screenshot(path=f"{name}.png") else: await page.screenshot(path=f"{name}.png", full_page=True) with open(f"{name}.png", "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") os.remove(f"{name}.png") return [types.ImageContent(type="image", data=encoded_string, mimeType="image/png")] class ClickToolHandler(ToolHandler): @update_page_after_click async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] selector = arguments.get("selector") await page.locator(selector).click() return [types.TextContent(type="text", text=f"Clicked element with selector {selector}")] class FillToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] selector = arguments.get("selector") value = arguments.get("value") await page.locator(selector).fill(value) return [types.TextContent(type="text", text=f"Filled element with selector {selector} with value {value}")] class EvaluateToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] script = arguments.get("script") result = await page.evaluate(script) return [types.TextContent(type="text", text=f"Evaluated script, result: {result}")] class ClickTextToolHandler(ToolHandler): @update_page_after_click async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] text = arguments.get("text") await page.locator(f"text={text}").nth(0).click() return [types.TextContent(type="text", text=f"Clicked element with text {text}")] class GetTextContentToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] # text_contents = await page.locator('body').all_inner_texts() async def get_unique_texts_js(page): unique_texts = await page.evaluate('''() => { var elements = Array.from(document.querySelectorAll('*')); // 先选择所有元素,再进行过滤 var uniqueTexts = new Set(); for (var element of elements) { if (element.offsetWidth > 0 || element.offsetHeight > 0) { // 判断是否可见 var childrenCount = element.querySelectorAll('*').length; if (childrenCount <= 3) { var innerText = element.innerText ? element.innerText.trim() : ''; if (innerText && innerText.length <= 1000) { uniqueTexts.add(innerText); } var value = element.getAttribute('value'); if (value) { uniqueTexts.add(value); } } } } //console.log( Array.from(uniqueTexts)); return Array.from(uniqueTexts); } ''') return unique_texts # 使用示例 text_contents = await get_unique_texts_js(page) return [types.TextContent(type="text", text=f"Text content of all elements: {text_contents}")] class GetHtmlContentToolHandler(ToolHandler): async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if not self._sessions: return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] selector = arguments.get("selector") html_content = await page.locator(selector).inner_html() return [types.TextContent(type="text", text=f"HTML content of element with selector {selector}: {html_content}")] tool_handlers = { "playwright_navigate": NavigateToolHandler(), "playwright_screenshot": ScreenshotToolHandler(), "playwright_click": ClickToolHandler(), "playwright_fill": FillToolHandler(), "playwright_evaluate": EvaluateToolHandler(), "playwright_click_text": ClickTextToolHandler(), "playwright_get_text_content": GetTextContentToolHandler(), "playwright_get_html_content": GetHtmlContentToolHandler(), "playwright_new_session":NewSessionToolHandler(), } @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests. Tools can modify server state and notify clients of changes. """ if name in tool_handlers: return await tool_handlers[name].handle(name, arguments) else: raise ValueError(f"Unknown tool: {name}") async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="playwright-plus-server", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())