selenium_tools.py•32 kB
"""
Herramientas MCP para interactuar con Selenium WebDriver.
"""
import os
import base64
import json
from typing import Dict, Any, Optional, List
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webelement import WebElement
from selenium.common.exceptions import (
TimeoutException,
NoSuchElementException,
WebDriverException,
ElementNotInteractableException
)
from config import ProxyConfig, BrowserOptions, DetectionEvasionConfig
from browser_manager import WebDriverManager
from session_manager import SessionManager
class SeleniumTools:
"""Herramientas para interactuar con Selenium WebDriver."""
def __init__(self, session_manager: SessionManager):
self.session_manager = session_manager
self.webdriver_manager = WebDriverManager()
# Mapeo de estrategias de localización
self.locator_strategies = {
"id": By.ID,
"name": By.NAME,
"class_name": By.CLASS_NAME,
"tag_name": By.TAG_NAME,
"css_selector": By.CSS_SELECTOR,
"xpath": By.XPATH,
"link_text": By.LINK_TEXT,
"partial_link_text": By.PARTIAL_LINK_TEXT
}
def start_browser(
self,
browser_type: str = "chrome",
options: Optional[Dict[str, Any]] = None,
proxy: Optional[Dict[str, str]] = None,
detection_evasion: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Inicia una nueva sesión de navegador."""
try:
# Convertir diccionarios a objetos de configuración
browser_options = BrowserOptions(**(options or {}))
proxy_config = ProxyConfig(**(proxy or {})) if proxy else None
evasion_config = DetectionEvasionConfig(**(detection_evasion or {})) if detection_evasion else None
# Crear driver
result = self.webdriver_manager.create_driver(
browser_type=browser_type,
browser_options=browser_options,
proxy_config=proxy_config,
detection_evasion=evasion_config
)
driver = result["driver"]
user_data_dir = result["user_data_dir"]
# Crear sesión
session_id = self.session_manager.create_session(driver, browser_type, user_data_dir)
return {
"success": True,
"session_id": session_id,
"browser_type": browser_type,
"message": f"Navegador {browser_type} iniciado exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error al iniciar navegador {browser_type}"
}
def navigate_to_url(self, session_id: str, url: str) -> Dict[str, Any]:
"""Navega a una URL específica."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
session.driver.get(url)
current_url = session.driver.current_url
title = session.driver.title
return {
"success": True,
"current_url": current_url,
"title": title,
"message": f"Navegado exitosamente a {url}"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error al navegar a {url}"
}
def find_element(
self,
session_id: str,
strategy: str,
value: str,
timeout: int = 10,
multiple: bool = False
) -> Dict[str, Any]:
"""Encuentra uno o múltiples elementos usando la estrategia especificada."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
if strategy not in self.locator_strategies:
return {
"success": False,
"error": f"Estrategia no válida. Disponibles: {list(self.locator_strategies.keys())}"
}
try:
by_strategy = self.locator_strategies[strategy]
wait = WebDriverWait(session.driver, timeout)
if multiple:
elements = wait.until(EC.presence_of_all_elements_located((by_strategy, value)))
element_info = []
for i, element in enumerate(elements):
element_info.append({
"index": i,
"tag_name": element.tag_name,
"text": element.text,
"is_displayed": element.is_displayed(),
"is_enabled": element.is_enabled(),
"location": element.location,
"size": element.size,
"attributes": self._get_element_attributes(element)
})
return {
"success": True,
"elements_found": len(elements),
"elements": element_info,
"message": f"Encontrados {len(elements)} elementos"
}
else:
element = wait.until(EC.presence_of_element_located((by_strategy, value)))
return {
"success": True,
"element": {
"tag_name": element.tag_name,
"text": element.text,
"is_displayed": element.is_displayed(),
"is_enabled": element.is_enabled(),
"location": element.location,
"size": element.size,
"attributes": self._get_element_attributes(element)
},
"message": "Elemento encontrado exitosamente"
}
except TimeoutException:
return {
"success": False,
"error": "Timeout: Elemento no encontrado",
"message": f"No se pudo encontrar elemento con {strategy}='{value}' en {timeout} segundos"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error al buscar elemento"
}
def click_element(
self,
session_id: str,
strategy: str,
value: str,
timeout: int = 10,
element_index: int = 0
) -> Dict[str, Any]:
"""Hace clic en un elemento."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
element = self._find_element_helper(session.driver, strategy, value, timeout, element_index)
# Scroll al elemento si es necesario
session.driver.execute_script("arguments[0].scrollIntoView(true);", element)
# Esperar a que sea clickeable
wait = WebDriverWait(session.driver, timeout)
clickable_element = wait.until(EC.element_to_be_clickable(element))
clickable_element.click()
return {
"success": True,
"message": "Elemento clickeado exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al hacer clic en el elemento"
}
def type_text(
self,
session_id: str,
strategy: str,
value: str,
text: str,
timeout: int = 10,
element_index: int = 0,
clear_first: bool = True
) -> Dict[str, Any]:
"""Escribe texto en un elemento."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
element = self._find_element_helper(session.driver, strategy, value, timeout, element_index)
# Scroll al elemento si es necesario
session.driver.execute_script("arguments[0].scrollIntoView(true);", element)
if clear_first:
element.clear()
element.send_keys(text)
return {
"success": True,
"text_typed": text,
"message": "Texto escrito exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al escribir texto"
}
def take_screenshot(
self,
session_id: str,
file_path: Optional[str] = None,
element_strategy: Optional[str] = None,
element_value: Optional[str] = None,
element_index: int = 0
) -> Dict[str, Any]:
"""Toma una captura de pantalla."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
if element_strategy and element_value:
# Captura de un elemento específico
element = self._find_element_helper(
session.driver, element_strategy, element_value, 10, element_index
)
screenshot_data = element.screenshot_as_base64
else:
# Captura de pantalla completa
screenshot_data = session.driver.get_screenshot_as_base64()
if file_path:
# Guardar en archivo
with open(file_path, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return {
"success": True,
"file_path": file_path,
"message": f"Captura guardada en {file_path}"
}
else:
# Devolver datos base64
return {
"success": True,
"screenshot_base64": screenshot_data,
"message": "Captura tomada exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al tomar captura de pantalla"
}
def upload_file(
self,
session_id: str,
strategy: str,
value: str,
file_path: str,
timeout: int = 10,
element_index: int = 0
) -> Dict[str, Any]:
"""Sube un archivo a un elemento input[type=file]."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
if not os.path.exists(file_path):
return {"success": False, "error": f"Archivo no encontrado: {file_path}"}
try:
element = self._find_element_helper(session.driver, strategy, value, timeout, element_index)
# Verificar que sea un input de tipo file
if element.tag_name.lower() != "input" or element.get_attribute("type") != "file":
return {
"success": False,
"error": "El elemento no es un input de tipo file"
}
element.send_keys(os.path.abspath(file_path))
return {
"success": True,
"file_path": file_path,
"message": "Archivo subido exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al subir archivo"
}
def execute_script(
self,
session_id: str,
script: str,
*args
) -> Dict[str, Any]:
"""Ejecuta JavaScript en el navegador."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
result = session.driver.execute_script(script, *args)
return {
"success": True,
"result": result,
"message": "Script ejecutado exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al ejecutar script"
}
def perform_mouse_action(
self,
session_id: str,
action_type: str,
strategy: Optional[str] = None,
value: Optional[str] = None,
element_index: int = 0,
x_offset: int = 0,
y_offset: int = 0,
target_strategy: Optional[str] = None,
target_value: Optional[str] = None,
target_index: int = 0
) -> Dict[str, Any]:
"""Realiza acciones de ratón (hover, drag and drop, etc.)."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
actions = ActionChains(session.driver)
if action_type == "hover":
if strategy and value:
element = self._find_element_helper(session.driver, strategy, value, 10, element_index)
actions.move_to_element(element)
else:
actions.move_by_offset(x_offset, y_offset)
elif action_type == "drag_and_drop":
if not (strategy and value and target_strategy and target_value):
return {
"success": False,
"error": "Para drag_and_drop se requieren elementos origen y destino"
}
source = self._find_element_helper(session.driver, strategy, value, 10, element_index)
target = self._find_element_helper(session.driver, target_strategy, target_value, 10, target_index)
actions.drag_and_drop(source, target)
elif action_type == "right_click":
if strategy and value:
element = self._find_element_helper(session.driver, strategy, value, 10, element_index)
actions.context_click(element)
else:
actions.context_click()
elif action_type == "double_click":
if strategy and value:
element = self._find_element_helper(session.driver, strategy, value, 10, element_index)
actions.double_click(element)
else:
actions.double_click()
else:
return {
"success": False,
"error": f"Tipo de acción no válida: {action_type}. Disponibles: hover, drag_and_drop, right_click, double_click"
}
actions.perform()
return {
"success": True,
"action_type": action_type,
"message": f"Acción {action_type} realizada exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error al realizar acción {action_type}"
}
def send_keys(
self,
session_id: str,
keys: str,
strategy: Optional[str] = None,
value: Optional[str] = None,
element_index: int = 0
) -> Dict[str, Any]:
"""Envía teclas especiales (Enter, Tab, etc.) al navegador o a un elemento específico."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
# Mapeo de teclas especiales
special_keys = {
"ENTER": Keys.ENTER,
"TAB": Keys.TAB,
"ESCAPE": Keys.ESCAPE,
"SPACE": Keys.SPACE,
"BACKSPACE": Keys.BACKSPACE,
"DELETE": Keys.DELETE,
"ARROW_UP": Keys.ARROW_UP,
"ARROW_DOWN": Keys.ARROW_DOWN,
"ARROW_LEFT": Keys.ARROW_LEFT,
"ARROW_RIGHT": Keys.ARROW_RIGHT,
"HOME": Keys.HOME,
"END": Keys.END,
"PAGE_UP": Keys.PAGE_UP,
"PAGE_DOWN": Keys.PAGE_DOWN,
"F1": Keys.F1,
"F2": Keys.F2,
"F3": Keys.F3,
"F4": Keys.F4,
"F5": Keys.F5,
"F6": Keys.F6,
"F7": Keys.F7,
"F8": Keys.F8,
"F9": Keys.F9,
"F10": Keys.F10,
"F11": Keys.F11,
"F12": Keys.F12,
"CONTROL": Keys.CONTROL,
"ALT": Keys.ALT,
"SHIFT": Keys.SHIFT
}
# Convertir string a tecla especial si es necesario
key_to_send = special_keys.get(keys.upper(), keys)
if strategy and value:
# Enviar a elemento específico
element = self._find_element_helper(session.driver, strategy, value, 10, element_index)
element.send_keys(key_to_send)
else:
# Enviar al navegador (elemento activo)
actions = ActionChains(session.driver)
actions.send_keys(key_to_send)
actions.perform()
return {
"success": True,
"keys_sent": keys,
"message": f"Teclas '{keys}' enviadas exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error al enviar teclas '{keys}'"
}
def close_browser(self, session_id: str) -> Dict[str, Any]:
"""Cierra una sesión de navegador."""
success = self.session_manager.close_session(session_id)
if success:
return {
"success": True,
"message": "Navegador cerrado exitosamente"
}
else:
return {
"success": False,
"error": "Sesión no encontrada",
"message": "No se pudo cerrar el navegador"
}
def get_page_info(self, session_id: str) -> Dict[str, Any]:
"""Obtiene información de la página actual."""
session = self.session_manager.get_session(session_id)
if not session:
return {"success": False, "error": "Sesión no encontrada"}
try:
return {
"success": True,
"url": session.driver.current_url,
"title": session.driver.title,
"page_source_length": len(session.driver.page_source),
"window_size": session.driver.get_window_size(),
"cookies_count": len(session.driver.get_cookies()),
"message": "Información de página obtenida exitosamente"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "Error al obtener información de la página"
}
def _find_element_helper(self, driver, strategy: str, value: str, timeout: int, element_index: int = 0):
"""Función auxiliar para encontrar elementos."""
if strategy not in self.locator_strategies:
raise ValueError(f"Estrategia no válida: {strategy}")
by_strategy = self.locator_strategies[strategy]
wait = WebDriverWait(driver, timeout)
if element_index == 0:
return wait.until(EC.presence_of_element_located((by_strategy, value)))
else:
elements = wait.until(EC.presence_of_all_elements_located((by_strategy, value)))
if element_index >= len(elements):
raise IndexError(f"Índice {element_index} fuera de rango. Elementos encontrados: {len(elements)}")
return elements[element_index]
def _get_element_attributes(self, element: WebElement) -> Dict[str, str]:
"""Obtiene los atributos principales de un elemento."""
common_attributes = ["id", "class", "name", "value", "href", "src", "alt", "title", "type"]
attributes = {}
for attr in common_attributes:
value = element.get_attribute(attr)
if value:
attributes[attr] = value
return attributes
def handle_popup(
self,
session_id: str,
popup_locator_type: str,
popup_locator_value: str,
title_id: str,
content_id: str,
take_screenshot: bool = False
) -> Dict[str, Any]:
"""
Maneja la detección y procesamiento de popups.
Args:
session_id: ID de la sesión del navegador
popup_locator_type: Tipo de localizador para el popup
popup_locator_value: Valor del localizador para el popup
title_id: ID del elemento que contiene el título del popup
content_id: ID del elemento que contiene el contenido del popup
take_screenshot: Si tomar captura de pantalla cuando se detecte el popup
Returns:
Dict con el resultado de la detección del popup
"""
try:
session = self.session_manager.get_session(session_id)
if not session:
return {
"success": False,
"message": f"Sesión {session_id} no encontrada"
}
driver = session.driver
# Obtener el localizador
if popup_locator_type.lower() not in self.locator_strategies:
return {
"success": False,
"message": f"Tipo de localizador no válido: {popup_locator_type}"
}
by_attribute = self.locator_strategies[popup_locator_type.lower()]
try:
# Esperar hasta que el popup sea visible (máximo 5 segundos)
wait = WebDriverWait(driver, 5)
popup = wait.until(
EC.visibility_of_element_located((by_attribute, popup_locator_value))
)
# Obtener título y contenido del popup
try:
title_element = popup.find_element(By.ID, title_id)
title = title_element.text
except NoSuchElementException:
title = "Título no encontrado"
try:
content_element = popup.find_element(By.ID, content_id)
content = content_element.text
except NoSuchElementException:
content = "Contenido no encontrado"
# Tomar captura de pantalla si se solicita
screenshot_path = None
if take_screenshot:
screenshot_path = f"/tmp/popup_detected_{session_id}.png"
driver.save_screenshot(screenshot_path)
return {
"success": True,
"popup_detected": True,
"title": title,
"content": content,
"message": f"Popup detectado: {title} - {content}",
"screenshot_path": screenshot_path
}
except TimeoutException:
return {
"success": True,
"popup_detected": False,
"message": "No se detectó ningún popup",
"title": None,
"content": None
}
except NoSuchElementException as e:
return {
"success": False,
"message": f"Error al procesar el popup: {str(e)}",
"error_type": "element_not_found"
}
except Exception as e:
return {
"success": False,
"message": f"Error en handle_popup: {str(e)}",
"error_type": "general_error"
}
def reescribir_HTML(
self,
session_id: str,
new_html: str,
locator_type: str,
locator_value: str
) -> Dict[str, Any]:
"""
Reescribe el HTML de un elemento específico.
Args:
session_id: ID de la sesión del navegador
new_html: Nuevo HTML para reemplazar el elemento
locator_type: Tipo de localizador (id, name, class_name, etc.)
locator_value: Valor del localizador
Returns:
Dict con el resultado de la operación
"""
try:
session = self.session_manager.get_session(session_id)
if not session:
return {
"success": False,
"message": f"Sesión {session_id} no encontrada"
}
driver = session.driver
# Obtener el localizador
if locator_type.lower() not in self.locator_strategies:
return {
"success": False,
"message": f"Tipo de localizador no válido: {locator_type}"
}
by_attribute = self.locator_strategies[locator_type.lower()]
# Esperar a que el elemento esté presente
wait = WebDriverWait(driver, 10)
element = wait.until(
EC.presence_of_element_located((by_attribute, locator_value))
)
# Ejecutar JavaScript para reemplazar el HTML del elemento
driver.execute_script(
"arguments[0].outerHTML = arguments[1];",
element,
new_html
)
return {
"success": True,
"message": f"HTML reescrito exitosamente en elemento con {locator_type}='{locator_value}'",
"new_html": new_html
}
except (NoSuchElementException, TimeoutException) as e:
return {
"success": False,
"message": f"Elemento no encontrado: {str(e)}",
"error_type": "element_not_found"
}
except Exception as e:
return {
"success": False,
"message": f"Error al reescribir HTML: {str(e)}",
"error_type": "general_error"
}
def is_element_empty(
self,
session_id: str,
locator_type: str,
locator_value: str
) -> Dict[str, Any]:
"""
Verifica si un elemento está vacío.
Args:
session_id: ID de la sesión del navegador
locator_type: Tipo de localizador (id, name, class_name, etc.)
locator_value: Valor del localizador
Returns:
Dict con el resultado de la verificación
"""
try:
session = self.session_manager.get_session(session_id)
if not session:
return {
"success": False,
"message": f"Sesión {session_id} no encontrada"
}
driver = session.driver
# Obtener el localizador
if locator_type.lower() not in self.locator_strategies:
return {
"success": False,
"message": f"Tipo de localizador no válido: {locator_type}"
}
by_attribute = self.locator_strategies[locator_type.lower()]
# Esperar a que el elemento esté presente
wait = WebDriverWait(driver, 10)
element = wait.until(
EC.presence_of_element_located((by_attribute, locator_value))
)
# Verificar si el elemento está vacío según su tipo
tag_name = element.tag_name.lower()
if tag_name in ["input", "textarea"]:
# Para elementos de entrada, verificar el atributo 'value'
value = element.get_attribute("value")
is_empty = value.strip() == "" if value else True
else:
# Para otros elementos, verificar el texto
text = element.text
is_empty = text.strip() == "" if text else True
return {
"success": True,
"is_empty": is_empty,
"element_type": tag_name,
"message": f"Elemento {'vacío' if is_empty else 'no vacío'}"
}
except (NoSuchElementException, TimeoutException) as e:
return {
"success": False,
"message": f"Elemento no encontrado: {str(e)}",
"error_type": "element_not_found"
}
except Exception as e:
return {
"success": False,
"message": f"Error al verificar si el elemento está vacío: {str(e)}",
"error_type": "general_error"
}