Skip to main content
Glama

Playwright Server MCP

by ziux
handles.py12.9 kB
import asyncio import uuid import base64 import os import logging from typing import Any import mcp.types as types from playwright.async_api import async_playwright from playwright_server.tools.base import ToolHandler, Property # 配置日志记录器 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('playwright_server.log') ] ) logger = logging.getLogger('playwright_server.tools') def update_page_after_click(func): async def wrapper(self, name: str, arguments: dict | None): logger.debug(f"执行装饰器 update_page_after_click,方法:{func.__name__}") if not self._sessions: logger.warning("没有活跃的会话。需要先创建一个新会话。") return [types.TextContent(type="text", text="No active session. Please create a new session first.")] session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] logger.info(f"监听页面事件 'page',当前URL: {page.url}") new_page_future = asyncio.ensure_future(page.context.wait_for_event("page", timeout=3000)) result = await func(self, name, arguments) try: logger.debug("等待新页面加载...") new_page = await new_page_future await new_page.wait_for_load_state() self._sessions[session_id]["page"] = new_page logger.info(f"页面已更新,新URL: {new_page.url}") except Exception as e: logger.debug(f"没有检测到页面切换: {str(e)}") # if page.url != self._sessions[session_id]["page"].url: # await page.wait_for_load_state() # self._sessions[session_id]["page"] = page return result return wrapper class NewSessionToolHandler(ToolHandler): name = "playwright_new_session" description = "创建新的浏览器会话,打开一个浏览器窗口并可选择性地访问指定网址" inputSchema = [ Property(name="url", typ="string", description="需要访问的初始网址,可选参数,不填则只打开浏览器", required=False) ] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始创建新的浏览器会话") try: self._playwright = await async_playwright().start() logger.debug("Playwright 实例已创建") browser = await self._playwright.chromium.launch(headless=False) logger.debug("浏览器已启动") page = await browser.new_page() logger.debug("新页面已创建") session_id = str(uuid.uuid4()) self._sessions[session_id] = {"browser": browser, "page": page} logger.info(f"会话已创建,ID: {session_id}") url = arguments.get("url") if url: if not url.startswith("http://") and not url.startswith("https://"): url = "https://" + url logger.info(f"正在导航到 URL: {url}") await page.goto(url) logger.debug(f"导航完成, 当前URL: {page.url}") return [types.TextContent(type="text", text="succ")] except Exception as e: logger.error(f"创建会话失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"创建会话失败: {str(e)}")] class NavigateToolHandler(ToolHandler): name = "playwright_navigate" description = "浏览器导航到指定网址,如果没有活跃的浏览器会话,会自动创建一个新会话" inputSchema = [ Property(name="url", typ="string", description="需要访问的网址,如不包含http或https前缀将自动添加https://") ] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始浏览器导航操作") if not self._sessions: logger.warning("没有活跃的会话,正在创建新会话") await NewSessionToolHandler().handle("",{}) # return [types.TextContent(type="text", text="No active session. Please create a new session first.")] try: page = self.get_page() url = arguments.get("url") if not url.startswith("http://") and not url.startswith("https://"): url = "https://" + url logger.info(f"正在导航到 URL: {url}") await page.goto(url) logger.debug(f"导航完成, 当前URL: {page.url}") text_content=await GetTextContentToolHandler().handle("",{}) return [types.TextContent(type="text", text=f"Navigated to {url}\npage_text_content[:200]:\n\n{text_content[:200]}")] except Exception as e: logger.error(f"导航失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"导航失败: {str(e)}")] class ScreenshotToolHandler(ToolHandler): name = "playwright_screenshot" description = "对当前页面或特定元素进行截图并返回图片内容" inputSchema = [ Property(name="name", typ="string", description="截图文件名称,不需要包含扩展名"), Property(name="selector", typ="string", description="CSS选择器,用于指定要截图的页面元素,不填则截取整个页面", required=False) ] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始截图操作") if not self._sessions: logger.warning("没有活跃的会话。需要先创建一个新会话。") return [types.TextContent(type="text", text="No active session. Please create a new session first.")] try: page = self.get_page() name = arguments.get("name") selector = arguments.get("selector") logger.debug(f"截图参数 - 文件名: {name}, 选择器: {selector}") # full_page = arguments.get("fullPage", False) if selector: logger.info(f"截取元素,选择器: {selector}") element = await page.locator(selector) await element.screenshot(path=f"{name}.png") else: logger.info("截取整个页面") await page.screenshot(path=f"{name}.png", full_page=True) with open(f"{name}.png", "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") os.remove(f"{name}.png") logger.debug(f"截图完成并编码为 base64,文件 {name}.png 已删除") return [types.ImageContent(type="image", data=encoded_string, mimeType="image/png")] except Exception as e: logger.error(f"截图失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"截图失败: {str(e)}")] class EvaluateToolHandler(ToolHandler): name = "playwright_evaluate" description = "在浏览器控制台中执行JavaScript代码并返回执行结果" inputSchema = [ Property(name="script", typ="string", description="需要在浏览器中执行的JavaScript代码") ] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始执行JavaScript代码") if not self._sessions: logger.warning("没有活跃的会话。需要先创建一个新会话。") return [types.TextContent(type="text", text="No active session. Please create a new session first.")] try: session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] script = arguments.get("script") logger.debug(f"执行脚本: {script}") result = await page.evaluate(script) logger.info(f"脚本执行完成,结果: {result}") return [types.TextContent(type="text", text=f"Evaluated script, result: {result}")] except Exception as e: logger.error(f"脚本执行失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"脚本执行失败: {str(e)}")] class GetTextContentToolHandler(ToolHandler): name = "playwright_get_text_content" description = "获取当前页面中所有可见元素的文本内容,智能过滤重复内容" inputSchema = [] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始获取页面文本内容") if not self._sessions: logger.warning("没有活跃的会话。需要先创建一个新会话。") return [types.TextContent(type="text", text="No active session. Please create a new session first.")] try: session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] logger.debug(f"从页面获取文本, URL: {page.url}") # text_contents = await page.locator('body').all_inner_texts() async def get_unique_texts_js(page): logger.debug("执行JavaScript获取唯一文本") unique_texts = await page.evaluate('''() => { var elements = Array.from(document.querySelectorAll('*')); // 先选择所有元素,再进行过滤 var uniqueTexts = new Set(); for (var element of elements) { if (element.offsetWidth > 0 || element.offsetHeight > 0) { // 判断是否可见 var childrenCount = element.querySelectorAll('*').length; if (childrenCount <= 3) { var innerText = element.innerText ? element.innerText.trim() : ''; if (innerText && innerText.length <= 1000) { uniqueTexts.add(innerText); } var value = element.getAttribute('value'); if (value) { uniqueTexts.add(value); } } } } //console.log( Array.from(uniqueTexts)); return Array.from(uniqueTexts); } ''') return unique_texts # 使用示例 text_contents = await get_unique_texts_js(page) logger.info(f"获取到 {len(text_contents)} 个唯一文本元素") logger.debug(f"文本内容: {text_contents[:3]}...") return [types.TextContent(type="text", text=f"Text content of all elements: {text_contents}")] except Exception as e: logger.error(f"获取文本内容失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"获取文本内容失败: {str(e)}")] class GetHtmlContentToolHandler(ToolHandler): name = "playwright_get_html_content" description = "获取页面中指定元素的HTML内容" inputSchema = [ Property(name="selector", typ="string", description="CSS选择器,用于定位需要获取HTML内容的页面元素") ] async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: logger.info("开始获取HTML内容") if not self._sessions: logger.warning("没有活跃的会话。需要先创建一个新会话。") return [types.TextContent(type="text", text="No active session. Please create a new session first.")] try: session_id = list(self._sessions.keys())[-1] page = self._sessions[session_id]["page"] selector = arguments.get("selector") logger.debug(f"获取选择器 '{selector}' 的HTML内容") html_content = await page.locator(selector).inner_html() logger.debug(f"获取到HTML内容,长度: {len(html_content)}") return [types.TextContent(type="text", text=f"HTML content of element with selector {selector}: {html_content}")] except Exception as e: logger.error(f"获取HTML内容失败: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"获取HTML内容失败: {str(e)}")]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ziux/playwright-plus-python-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server