action.py•21.2 kB
import asyncio
import uuid
import base64
import os
import json
import logging
from typing import Any, Optional, Dict, List, Union
import mcp.types as types
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from playwright_server.tools.base import ToolHandler, Property
# 获取日志记录器
logger = logging.getLogger('playwright_server.tools.action')
class ActionToolHandler(ToolHandler):
name = "playwright_action"
description = "根据给定的locator 和action 执行操作"
inputSchema = [
Property(name="by", typ="string", description="选择元素方式,遵循playwright的选择器原则,可选项: get_by_text,get_by_placeholder,get_by_label,get_by_role,get_by_alt_text,get_by_title,get_by_test_id,get_by_css,locator", required=True),
Property(name="by_value", typ="string", description="选择元素的值", required=True),
Property(name="action", typ="string", description="操作类型,可选项: fill,click,dblclick,hover,tap,check,uncheck,select_option,press,type,focus,blur,drag_to,screenshot,get_text,count,is_visible,is_enabled,is_hidden,is_disabled,is_checked,get_attribute,evaluate,wait_for,press_sequentially,clear,scroll_into_view,set_input_files,select_text,dispatch_event,get_inner_text,get_inner_html,get_content,all_inner_texts,all_text_contents", required=True),
Property(name="value", typ="string", description="操作的值", required=False),
Property(name="force", typ="boolean", description="是否强制执行动作,无视actionability检查", required=False),
Property(name="delay", typ="number", description="按键之间的延迟(毫秒)", required=False),
Property(name="position", typ="string", description="相对于元素的点击位置,格式为: 'x,y'", required=False),
Property(name="timeout", typ="number", description="操作超时时间(毫秒)", required=False),
Property(name="selector", typ="string", description="拖拽目标元素的选择器,用于drag_to操作", required=False),
Property(name="attribute_name", typ="string", description="要获取的属性名称,用于get_attribute操作", required=False),
Property(name="script", typ="string", description="要在元素上执行的JavaScript代码,用于evaluate操作", required=False),
Property(name="filename", typ="string", description="截图保存的文件名,用于screenshot操作", required=False),
Property(name="no_wait_after", typ="boolean", description="是否不等待动作完成后的导航", required=False),
Property(name="strict", typ="boolean", description="是否严格匹配元素(如果找到多个元素是否报错)", required=False),
Property(name="state", typ="string", description="等待的元素状态,用于wait_for操作,可选: 'attached', 'detached', 'visible', 'hidden'", required=False),
Property(name="modifiers", typ="array", description="修饰键,用于键盘操作,可选: 'Alt', 'Control', 'Meta', 'Shift'", required=False),
Property(name="button", typ="string", description="鼠标按钮,用于点击操作,可选: 'left', 'right', 'middle'", required=False),
Property(name="options", typ="string", description="选择框的选项,用于select_option操作,JSON格式的字符串,可以包含 value, label, index", required=False),
Property(name="files", typ="string", description="要上传的文件路径,用于set_input_files操作,多个文件用逗号分隔", required=False),
Property(name="event_name", typ="string", description="要分发的事件名称,用于dispatch_event操作", required=False),
Property(name="event_data", typ="string", description="事件数据,JSON格式的字符串,用于dispatch_event操作", required=False),
Property(name="exact", typ="boolean", description="是否精确匹配文本", required=False),
Property(name="has_text", typ="string", description="元素必须包含的文本", required=False)
]
async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info(f"开始执行操作: by={arguments.get('by')}, by_value={arguments.get('by_value')}, action={arguments.get('action')}")
try:
page = self.get_page()
logger.debug(f"当前页面URL: {page.url}")
by = arguments.get("by")
by_value = arguments.get("by_value")
action = arguments.get("action")
value = arguments.get("value")
force = arguments.get("force", False)
delay = arguments.get("delay")
position = arguments.get("position")
timeout = arguments.get("timeout")
selector = arguments.get("selector")
attribute_name = arguments.get("attribute_name")
script = arguments.get("script")
filename = arguments.get("filename", "screenshot.png")
no_wait_after = arguments.get("no_wait_after", False)
strict = arguments.get("strict", False)
state = arguments.get("state")
modifiers = arguments.get("modifiers", [])
button = arguments.get("button", "left")
options_str = arguments.get("options")
files_str = arguments.get("files")
event_name = arguments.get("event_name")
event_data_str = arguments.get("event_data")
exact = arguments.get("exact", False)
has_text = arguments.get("has_text")
logger.debug(f"操作参数: force={force}, delay={delay}, timeout={timeout}, strict={strict}, exact={exact}")
# 解析复杂参数
options = {}
if options_str:
try:
logger.debug(f"解析options字符串: {options_str}")
options = json.loads(options_str)
except json.JSONDecodeError as e:
logger.error(f"options参数格式错误: {str(e)}")
return [types.TextContent(type="text", text="options参数格式错误,必须是有效的JSON字符串")]
files = []
if files_str:
logger.debug(f"解析files字符串: {files_str}")
files = [f.strip() for f in files_str.split(",")]
event_data = {}
if event_data_str:
try:
logger.debug(f"解析event_data字符串: {event_data_str}")
event_data = json.loads(event_data_str)
except json.JSONDecodeError as e:
logger.error(f"event_data参数格式错误: {str(e)}")
return [types.TextContent(type="text", text="event_data参数格式错误,必须是有效的JSON字符串")]
# 构建选项字典
action_options: Dict[str, Any] = {}
if force is not None:
action_options["force"] = force
if delay is not None:
action_options["delay"] = delay
if timeout is not None:
action_options["timeout"] = timeout
if no_wait_after is not None:
action_options["no_wait_after"] = no_wait_after
if strict is not None:
action_options["strict"] = strict
if modifiers:
action_options["modifiers"] = modifiers
if button:
action_options["button"] = button
if exact is not None:
action_options["exact"] = exact
logger.debug(f"构建的action_options: {action_options}")
# 处理位置参数
if position:
try:
logger.debug(f"解析位置参数: {position}")
x, y = map(float, position.split(','))
action_options["position"] = {"x": x, "y": y}
except ValueError as e:
logger.error(f"位置格式错误: {str(e)}")
return [types.TextContent(type="text", text="位置格式错误,应为'x,y'")]
# 根据by参数获取locator
locator = None
locator_options = {"exact": exact} if exact is not None else {}
try:
logger.info(f"开始定位元素: by={by}, by_value={by_value}")
if by == "get_by_text":
locator = page.get_by_text(by_value, **locator_options)
elif by == "get_by_placeholder":
locator = page.get_by_placeholder(by_value, **locator_options)
elif by == "get_by_label":
locator = page.get_by_label(by_value, **locator_options)
elif by == "get_by_role":
locator = page.get_by_role(by_value, name=has_text, **locator_options)
elif by == "get_by_alt_text":
locator = page.get_by_alt_text(by_value, **locator_options)
elif by == "get_by_title":
locator = page.get_by_title(by_value, **locator_options)
elif by == "get_by_test_id":
locator = page.get_by_test_id(by_value)
elif by == "get_by_css":
locator = page.locator(by_value, has_text=has_text)
elif by == "locator":
locator = page.locator(by_value, has_text=has_text)
else:
logger.error(f"不支持的by类型: {by}")
return [types.TextContent(type="text", text=f"不支持的by类型: {by}")]
logger.debug("元素定位成功")
except Exception as e:
logger.error(f"定位元素失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"定位元素失败: {str(e)}")]
# 根据action执行不同操作
result = None
try:
logger.info(f"开始执行操作: action={action}")
if action == "fill":
logger.debug(f"填充文本: {value}")
await locator.fill(value, **action_options)
result = "填充成功"
elif action == "click":
logger.debug("点击元素")
await locator.click(**action_options)
result = "点击成功"
elif action == "dblclick":
logger.debug("双击元素")
await locator.dblclick(**action_options)
result = "双击成功"
elif action == "hover":
logger.debug("悬停在元素上")
await locator.hover(**action_options)
result = "悬停成功"
elif action == "tap":
logger.debug("轻触元素")
await locator.tap(**action_options)
result = "轻触成功"
elif action == "check":
logger.debug("选中复选框")
await locator.check(**action_options)
result = "选中成功"
elif action == "uncheck":
logger.debug("取消选中复选框")
await locator.uncheck(**action_options)
result = "取消选中成功"
elif action == "select_option":
if options:
logger.debug(f"选择选项(使用options): {options}")
await locator.select_option(**options, **action_options)
else:
logger.debug(f"选择选项(使用value): {value}")
await locator.select_option(value=value, **action_options)
result = "选择选项成功"
elif action == "press":
logger.debug(f"按下按键: {value}")
await locator.press(value, **action_options)
result = "按键成功"
elif action == "type":
logger.debug(f"输入文本: {value}")
await locator.type(value, **action_options)
result = "输入文本成功"
elif action == "focus":
logger.debug("聚焦元素")
await locator.focus(**action_options)
result = "聚焦成功"
elif action == "blur":
logger.debug("失焦元素")
await locator.blur(**action_options)
result = "失焦成功"
elif action == "drag_to":
logger.debug(f"拖拽元素到: {selector}")
target = page.locator(selector)
await locator.drag_to(target, **action_options)
result = "拖拽成功"
elif action == "screenshot":
logger.debug(f"元素截图,保存到: {filename}")
screenshot_path = f"{filename}"
await locator.screenshot(path=screenshot_path, **action_options)
with open(screenshot_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
os.remove(screenshot_path)
logger.info(f"截图完成,文件大小: {len(encoded_string)} 字节")
return [types.ImageContent(type="image", data=encoded_string, mimeType="image/png")]
elif action == "get_text":
logger.debug("获取元素文本")
text = await locator.text_content(**action_options)
logger.debug(f"获取到的文本: {text}")
result = text
elif action == "count":
logger.debug("计算匹配元素数量")
count = await locator.count()
logger.debug(f"匹配的元素数量: {count}")
result = str(count)
elif action == "is_visible":
logger.debug("检查元素是否可见")
is_visible = await locator.is_visible(**action_options)
logger.debug(f"元素可见性: {is_visible}")
result = str(is_visible)
elif action == "is_enabled":
logger.debug("检查元素是否启用")
is_enabled = await locator.is_enabled(**action_options)
logger.debug(f"元素启用状态: {is_enabled}")
result = str(is_enabled)
elif action == "is_hidden":
logger.debug("检查元素是否隐藏")
is_hidden = await locator.is_hidden(**action_options)
logger.debug(f"元素隐藏状态: {is_hidden}")
result = str(is_hidden)
elif action == "is_disabled":
logger.debug("检查元素是否禁用")
is_disabled = await locator.is_disabled(**action_options)
logger.debug(f"元素禁用状态: {is_disabled}")
result = str(is_disabled)
elif action == "is_checked":
logger.debug("检查元素是否选中")
is_checked = await locator.is_checked(**action_options)
logger.debug(f"元素选中状态: {is_checked}")
result = str(is_checked)
elif action == "get_attribute":
logger.debug(f"获取元素属性: {attribute_name}")
attribute = await locator.get_attribute(attribute_name, **action_options)
logger.debug(f"属性值: {attribute}")
result = attribute
elif action == "evaluate":
logger.debug(f"在元素上执行脚本: {script}")
evaluated = await locator.evaluate(script, **action_options)
logger.debug(f"脚本执行结果: {evaluated}")
result = str(evaluated)
elif action == "wait_for":
if not state:
state = "visible"
logger.debug(f"等待元素状态: {state}")
await locator.wait_for(state=state, **action_options)
result = f"等待元素状态 '{state}' 成功"
elif action == "press_sequentially":
logger.debug(f"序列按键输入: {value}")
await locator.press_sequentially(value, **action_options)
result = "序列按键输入成功"
elif action == "clear":
logger.debug("清除元素内容")
await locator.clear(**action_options)
result = "清除内容成功"
elif action == "scroll_into_view":
logger.debug("将元素滚动到视图")
await locator.scroll_into_view_if_needed(**action_options)
result = "滚动到元素成功"
elif action == "set_input_files":
if files:
logger.debug(f"设置文件输入: {files}")
await locator.set_input_files(files, **action_options)
result = "设置文件输入成功"
else:
logger.error("没有指定要上传的文件")
return [types.TextContent(type="text", text="没有指定要上传的文件")]
elif action == "select_text":
logger.debug("选择元素中的文本")
await locator.select_text(**action_options)
result = "选择文本成功"
elif action == "dispatch_event":
if not event_name:
logger.error("没有指定事件名称")
return [types.TextContent(type="text", text="没有指定事件名称")]
logger.debug(f"分发事件: {event_name}, 数据: {event_data}")
await locator.dispatch_event(event_name, event_data, **action_options)
result = f"分发事件 '{event_name}' 成功"
elif action == "get_inner_text":
logger.debug("获取元素内部文本")
inner_text = await locator.inner_text(**action_options)
logger.debug(f"内部文本: {inner_text}")
result = inner_text
elif action == "get_inner_html":
logger.debug("获取元素内部HTML")
inner_html = await locator.inner_html(**action_options)
logger.debug(f"内部HTML长度: {len(inner_html)}")
result = inner_html
elif action == "get_content":
logger.debug("获取元素内容")
content = await locator.text_content(**action_options)
logger.debug(f"内容: {content}")
result = content
elif action == "all_inner_texts":
logger.debug("获取所有内部文本")
all_inner_texts = await locator.all_inner_texts()
logger.debug(f"内部文本数量: {len(all_inner_texts)}")
result = str(all_inner_texts)
elif action == "all_text_contents":
logger.debug("获取所有文本内容")
all_text_contents = await locator.all_text_contents()
logger.debug(f"文本内容数量: {len(all_text_contents)}")
result = str(all_text_contents)
else:
logger.error(f"不支持的action类型: {action}")
return [types.TextContent(type="text", text=f"不支持的action类型: {action}")]
logger.info(f"操作执行成功: {action}")
return [types.TextContent(type="text", text=f"{result}")]
except PlaywrightTimeoutError as e:
logger.error(f"操作超时: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"操作超时: {str(e)}")]
except Exception as e:
logger.error(f"操作执行失败: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"操作执行失败: {str(e)}")]
except Exception as e:
logger.error(f"操作处理器全局异常: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"操作处理器发生错误: {str(e)}")]