handles.py•12.9 kB
import asyncio
import uuid
import base64
import os
import logging
from typing import Any
import mcp.types as types
from playwright.async_api import async_playwright
from playwright_server.tools.base import ToolHandler, Property
# 配置日志记录器
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('playwright_server.log')
]
)
logger = logging.getLogger('playwright_server.tools')
def update_page_after_click(func):
async def wrapper(self, name: str, arguments: dict | None):
logger.debug(f"执行装饰器 update_page_after_click,方法:{func.__name__}")
if not self._sessions:
logger.warning("没有活跃的会话。需要先创建一个新会话。")
return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
session_id = list(self._sessions.keys())[-1]
page = self._sessions[session_id]["page"]
logger.info(f"监听页面事件 'page',当前URL: {page.url}")
new_page_future = asyncio.ensure_future(page.context.wait_for_event("page", timeout=3000))
result = await func(self, name, arguments)
try:
logger.debug("等待新页面加载...")
new_page = await new_page_future
await new_page.wait_for_load_state()
self._sessions[session_id]["page"] = new_page
logger.info(f"页面已更新,新URL: {new_page.url}")
except Exception as e:
logger.debug(f"没有检测到页面切换: {str(e)}")
# if page.url != self._sessions[session_id]["page"].url:
# await page.wait_for_load_state()
# self._sessions[session_id]["page"] = page
return result
return wrapper
class NewSessionToolHandler(ToolHandler):
name = "playwright_new_session"
description = "创建新的浏览器会话,打开一个浏览器窗口并可选择性地访问指定网址"
inputSchema = [
Property(name="url", typ="string", description="需要访问的初始网址,可选参数,不填则只打开浏览器", required=False)
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始创建新的浏览器会话")
try:
self._playwright = await async_playwright().start()
logger.debug("Playwright 实例已创建")
browser = await self._playwright.chromium.launch(headless=False)
logger.debug("浏览器已启动")
page = await browser.new_page()
logger.debug("新页面已创建")
session_id = str(uuid.uuid4())
self._sessions[session_id] = {"browser": browser, "page": page}
logger.info(f"会话已创建,ID: {session_id}")
url = arguments.get("url")
if url:
if not url.startswith("http://") and not url.startswith("https://"):
url = "https://" + url
logger.info(f"正在导航到 URL: {url}")
await page.goto(url)
logger.debug(f"导航完成, 当前URL: {page.url}")
return [types.TextContent(type="text", text="succ")]
except Exception as e:
logger.error(f"创建会话失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"创建会话失败: {str(e)}")]
class NavigateToolHandler(ToolHandler):
name = "playwright_navigate"
description = "浏览器导航到指定网址,如果没有活跃的浏览器会话,会自动创建一个新会话"
inputSchema = [
Property(name="url", typ="string", description="需要访问的网址,如不包含http或https前缀将自动添加https://")
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始浏览器导航操作")
if not self._sessions:
logger.warning("没有活跃的会话,正在创建新会话")
await NewSessionToolHandler().handle("",{})
# return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
try:
page = self.get_page()
url = arguments.get("url")
if not url.startswith("http://") and not url.startswith("https://"):
url = "https://" + url
logger.info(f"正在导航到 URL: {url}")
await page.goto(url)
logger.debug(f"导航完成, 当前URL: {page.url}")
text_content=await GetTextContentToolHandler().handle("",{})
return [types.TextContent(type="text", text=f"Navigated to {url}\npage_text_content[:200]:\n\n{text_content[:200]}")]
except Exception as e:
logger.error(f"导航失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"导航失败: {str(e)}")]
class ScreenshotToolHandler(ToolHandler):
name = "playwright_screenshot"
description = "对当前页面或特定元素进行截图并返回图片内容"
inputSchema = [
Property(name="name", typ="string", description="截图文件名称,不需要包含扩展名"),
Property(name="selector", typ="string", description="CSS选择器,用于指定要截图的页面元素,不填则截取整个页面", required=False)
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始截图操作")
if not self._sessions:
logger.warning("没有活跃的会话。需要先创建一个新会话。")
return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
try:
page = self.get_page()
name = arguments.get("name")
selector = arguments.get("selector")
logger.debug(f"截图参数 - 文件名: {name}, 选择器: {selector}")
# full_page = arguments.get("fullPage", False)
if selector:
logger.info(f"截取元素,选择器: {selector}")
element = await page.locator(selector)
await element.screenshot(path=f"{name}.png")
else:
logger.info("截取整个页面")
await page.screenshot(path=f"{name}.png", full_page=True)
with open(f"{name}.png", "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
os.remove(f"{name}.png")
logger.debug(f"截图完成并编码为 base64,文件 {name}.png 已删除")
return [types.ImageContent(type="image", data=encoded_string, mimeType="image/png")]
except Exception as e:
logger.error(f"截图失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"截图失败: {str(e)}")]
class EvaluateToolHandler(ToolHandler):
name = "playwright_evaluate"
description = "在浏览器控制台中执行JavaScript代码并返回执行结果"
inputSchema = [
Property(name="script", typ="string", description="需要在浏览器中执行的JavaScript代码")
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始执行JavaScript代码")
if not self._sessions:
logger.warning("没有活跃的会话。需要先创建一个新会话。")
return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
try:
session_id = list(self._sessions.keys())[-1]
page = self._sessions[session_id]["page"]
script = arguments.get("script")
logger.debug(f"执行脚本: {script}")
result = await page.evaluate(script)
logger.info(f"脚本执行完成,结果: {result}")
return [types.TextContent(type="text", text=f"Evaluated script, result: {result}")]
except Exception as e:
logger.error(f"脚本执行失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"脚本执行失败: {str(e)}")]
class GetTextContentToolHandler(ToolHandler):
name = "playwright_get_text_content"
description = "获取当前页面中所有可见元素的文本内容,智能过滤重复内容"
inputSchema = []
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始获取页面文本内容")
if not self._sessions:
logger.warning("没有活跃的会话。需要先创建一个新会话。")
return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
try:
session_id = list(self._sessions.keys())[-1]
page = self._sessions[session_id]["page"]
logger.debug(f"从页面获取文本, URL: {page.url}")
# text_contents = await page.locator('body').all_inner_texts()
async def get_unique_texts_js(page):
logger.debug("执行JavaScript获取唯一文本")
unique_texts = await page.evaluate('''() => {
var elements = Array.from(document.querySelectorAll('*')); // 先选择所有元素,再进行过滤
var uniqueTexts = new Set();
for (var element of elements) {
if (element.offsetWidth > 0 || element.offsetHeight > 0) { // 判断是否可见
var childrenCount = element.querySelectorAll('*').length;
if (childrenCount <= 3) {
var innerText = element.innerText ? element.innerText.trim() : '';
if (innerText && innerText.length <= 1000) {
uniqueTexts.add(innerText);
}
var value = element.getAttribute('value');
if (value) {
uniqueTexts.add(value);
}
}
}
}
//console.log( Array.from(uniqueTexts));
return Array.from(uniqueTexts);
}
''')
return unique_texts
# 使用示例
text_contents = await get_unique_texts_js(page)
logger.info(f"获取到 {len(text_contents)} 个唯一文本元素")
logger.debug(f"文本内容: {text_contents[:3]}...")
return [types.TextContent(type="text", text=f"Text content of all elements: {text_contents}")]
except Exception as e:
logger.error(f"获取文本内容失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"获取文本内容失败: {str(e)}")]
class GetHtmlContentToolHandler(ToolHandler):
name = "playwright_get_html_content"
description = "获取页面中指定元素的HTML内容"
inputSchema = [
Property(name="selector", typ="string", description="CSS选择器,用于定位需要获取HTML内容的页面元素")
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info("开始获取HTML内容")
if not self._sessions:
logger.warning("没有活跃的会话。需要先创建一个新会话。")
return [types.TextContent(type="text", text="No active session. Please create a new session first.")]
try:
session_id = list(self._sessions.keys())[-1]
page = self._sessions[session_id]["page"]
selector = arguments.get("selector")
logger.debug(f"获取选择器 '{selector}' 的HTML内容")
html_content = await page.locator(selector).inner_html()
logger.debug(f"获取到HTML内容,长度: {len(html_content)}")
return [types.TextContent(type="text", text=f"HTML content of element with selector {selector}: {html_content}")]
except Exception as e:
logger.error(f"获取HTML内容失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"获取HTML内容失败: {str(e)}")]