service.py•23.8 kB
# /src/dom/service.py
import gc
import json
import logging
from dataclasses import dataclass
from importlib import resources # Use importlib.resources
from typing import TYPE_CHECKING, Optional, Tuple, Dict, List
import re
# Use relative imports if within the same package structure
from .views import (
    DOMBaseNode,
    DOMElementNode,
    DOMState,
    DOMTextNode,
    SelectorMap,
    ViewportInfo, # Added ViewportInfo here
    CoordinateSet # Added CoordinateSet
)
# Removed utils import assuming time_execution_async is defined elsewhere or removed for brevity
# from ..utils import time_execution_async # Example relative import if utils is one level up
if TYPE_CHECKING:
    from patchright.sync_api import Page # Use sync_api for this repo
logger = logging.getLogger(__name__)
# Decorator placeholder if not using utils.time_execution_async
def time_execution_async(label):
    def decorator(func):
        # In a sync context, this decorator needs adjustment or removal
        # For simplicity here, we'll just make it pass through in the sync version
        def wrapper(*args, **kwargs):
            # logger.debug(f"Executing {label}...") # Basic logging
            result = func(*args, **kwargs)
            # logger.debug(f"Finished {label}.") # Basic logging
            return result
        return wrapper
    return decorator
class DomService:
    def __init__(self, page: 'Page'):
        self.page = page
        self.xpath_cache = {} # Consider if this cache is still needed/used effectively
        # Correctly load JS using importlib.resources relative to this file
        try:
            # Assuming buildDomTree.js is in the same directory 'dom'
            with resources.path(__package__, 'buildDomTree.js') as js_path:
                 self.js_code = js_path.read_text(encoding='utf-8')
            logger.debug("buildDomTree.js loaded successfully.")
        except FileNotFoundError:
             logger.error("buildDomTree.js not found in the 'dom' package directory!")
             raise
        except Exception as e:
             logger.error(f"Error loading buildDomTree.js: {e}", exc_info=True)
             raise
    # region - Clickable elements
    @time_execution_async('--get_clickable_elements')
    def get_clickable_elements(
        self,
        highlight_elements: bool = True,
        focus_element: int = -1,
        viewport_expansion: int = 0,
    ) -> DOMState:
        """Gets interactive elements and DOM structure. Sync version."""
        logger.debug(f"Calling _build_dom_tree with highlight={highlight_elements}, focus={focus_element}, expansion={viewport_expansion}")
        # In sync context, _build_dom_tree should be sync
        element_tree, selector_map = self._build_dom_tree(highlight_elements, focus_element, viewport_expansion)
        return DOMState(element_tree=element_tree, selector_map=selector_map)
    # Removed get_cross_origin_iframes for brevity, can be added back if needed
    # @time_execution_async('--build_dom_tree') # Adjust decorator if needed for sync
    def _build_dom_tree(
        self,
        highlight_elements: bool,
        focus_element: int,
        viewport_expansion: int,
    ) -> Tuple[DOMElementNode, SelectorMap]:
        """Builds the DOM tree by executing JS in the browser. Sync version."""
        logger.debug("Executing _build_dom_tree...")
        if self.page.evaluate('1+1') != 2:
            raise ValueError('The page cannot evaluate javascript code properly')
        if self.page.url == 'about:blank' or self.page.url == '':
            logger.info("Page URL is blank, returning empty DOM structure.")
            # short-circuit if the page is a new empty tab for speed
            return (
                DOMElementNode(
                    tag_name='body',
                    xpath='',
                    attributes={},
                    children=[],
                    is_visible=False,
                    parent=None,
                ),
                {},
            )
        debug_mode = logger.getEffectiveLevel() <= logging.DEBUG
        args = {
            'doHighlightElements': highlight_elements,
            'focusHighlightIndex': focus_element,
            'viewportExpansion': viewport_expansion,
            'debugMode': debug_mode,
        }
        logger.debug(f"Evaluating buildDomTree.js with args: {args}")
        try:
            # Use evaluate() directly in sync context
            eval_page: dict = self.page.evaluate(f"({self.js_code})", args)
        except Exception as e:
            logger.error(f"Error evaluating buildDomTree.js: {type(e).__name__}: {e}", exc_info=False) # Less verbose logging
            logger.debug(f"JS Code Snippet (first 500 chars):\n{self.js_code[:500]}...") # Log JS snippet on error
            # Try to get page state for context
            try:
                 page_url = self.page.url
                 page_title = self.page.title()
                 logger.error(f"Error occurred on page: URL='{page_url}', Title='{page_title}'")
            except Exception as page_state_e:
                 logger.error(f"Could not get page state after JS error: {page_state_e}")
            raise RuntimeError(f"Failed to evaluate DOM building script: {e}") from e # Re-raise a standard error
        # Only log performance metrics in debug mode
        if debug_mode and 'perfMetrics' in eval_page:
             logger.debug(
                 'DOM Tree Building Performance Metrics for: %s\n%s',
                 self.page.url,
                 json.dumps(eval_page['perfMetrics'], indent=2),
             )
        if 'map' not in eval_page or 'rootId' not in eval_page:
            logger.error(f"Invalid structure returned from buildDomTree.js: Missing 'map' or 'rootId'. Response keys: {eval_page.keys()}")
            # Log more details if possible
            logger.error(f"JS Eval Response Snippet: {str(eval_page)[:1000]}...")
            # Return empty structure to prevent downstream errors
            return (DOMElementNode(tag_name='body', xpath='', attributes={}, children=[], is_visible=False, parent=None), {})
            # raise ValueError("Invalid structure returned from DOM building script.")
        # Use sync _construct_dom_tree
        return self._construct_dom_tree(eval_page)
    # @time_execution_async('--construct_dom_tree') # Adjust decorator if needed for sync
    def _construct_dom_tree(
        self,
        eval_page: dict,
    ) -> Tuple[DOMElementNode, SelectorMap]:
        """Constructs the Python DOM tree from the JS map. Sync version."""
        logger.debug("Constructing Python DOM tree from JS map...")
        js_node_map = eval_page['map']
        js_root_id = eval_page.get('rootId') # Use .get for safety
        if js_root_id is None:
             logger.error("JS evaluation result missing 'rootId'. Cannot build tree.")
             # Return empty structure
             return (DOMElementNode(tag_name='body', xpath='', attributes={}, children=[], is_visible=False, parent=None), {})
        selector_map: SelectorMap = {}
        node_map: Dict[str, DOMBaseNode] = {} # Use string keys consistently
        # Iterate through the JS map provided by the browser script
        for id_str, node_data in js_node_map.items():
            if not isinstance(node_data, dict):
                 logger.warning(f"Skipping invalid node data (not a dict) for ID: {id_str}")
                 continue
            node, children_ids_str = self._parse_node(node_data)
            if node is None:
                continue # Skip nodes that couldn't be parsed
            node_map[id_str] = node # Store with string ID
            # If the node is an element node with a highlight index, add it to the selector map
            if isinstance(node, DOMElementNode) and node.highlight_index is not None:
                selector_map[node.highlight_index] = node
            # Link children to this node if it's an element node
            if isinstance(node, DOMElementNode):
                for child_id_str in children_ids_str:
                    child_node = node_map.get(child_id_str) # Use .get() for safety
                    if child_node:
                        # Set the parent reference on the child node
                        child_node.parent = node
                        # Add the child node to the current node's children list
                        node.children.append(child_node)
                    else:
                        # This can happen if a child node was invalid or filtered out
                        logger.debug(f"Child node with ID '{child_id_str}' not found in node_map while processing parent '{id_str}'.")
        # Retrieve the root node using the root ID from the evaluation result
        root_node = node_map.get(str(js_root_id))
        # Clean up large intermediate structures
        del node_map
        del js_node_map
        gc.collect()
        # Validate the root node
        if root_node is None or not isinstance(root_node, DOMElementNode):
            logger.error(f"Failed to find valid root DOMElementNode with ID '{js_root_id}'.")
            # Return a default empty body node to avoid crashes
            return (DOMElementNode(tag_name='body', xpath='', attributes={}, children=[], is_visible=False, parent=None), selector_map)
        logger.debug("Finished constructing Python DOM tree.")
        return root_node, selector_map
    def _parse_node(
        self,
        node_data: dict,
    ) -> Tuple[Optional[DOMBaseNode], List[str]]: # Return string IDs
        """Parses a single node dictionary from JS into a Python DOM object. Sync version."""
        if not node_data:
            return None, []
        node_type = node_data.get('type') # Check if it's explicitly a text node
        if node_type == 'TEXT_NODE':
             # Handle Text Nodes
             text = node_data.get('text', '')
             if not text: # Skip empty text nodes early
                  return None, []
             text_node = DOMTextNode(
                 text=text,
                 is_visible=node_data.get('isVisible', False), # Use .get for safety
                 parent=None, # Parent set later during construction
             )
             return text_node, []
        elif 'tagName' in node_data:
             # Handle Element Nodes
             tag_name = node_data['tagName']
             # Process coordinates if they exist (using Pydantic models from view)
             page_coords_data = node_data.get('pageCoordinates')
             viewport_coords_data = node_data.get('viewportCoordinates')
             viewport_info_data = node_data.get('viewportInfo')
             page_coordinates = CoordinateSet(**page_coords_data) if page_coords_data else None
             viewport_coordinates = CoordinateSet(**viewport_coords_data) if viewport_coords_data else None
             viewport_info = ViewportInfo(**viewport_info_data) if viewport_info_data else None
             element_node = DOMElementNode(
                 tag_name=tag_name.lower(), # Ensure lowercase
                 xpath=node_data.get('xpath', ''),
                 attributes=node_data.get('attributes', {}),
                 children=[], # Children added later
                 is_visible=node_data.get('isVisible', False),
                 is_interactive=node_data.get('isInteractive', False),
                 is_top_element=node_data.get('isTopElement', False),
                 is_in_viewport=node_data.get('isInViewport', False),
                 highlight_index=node_data.get('highlightIndex'), # Can be None
                 shadow_root=node_data.get('shadowRoot', False),
                 parent=None, # Parent set later
                 # Add coordinate fields
                 page_coordinates=page_coordinates,
                 viewport_coordinates=viewport_coordinates,
                 viewport_info=viewport_info,
                 # Enhanced CSS selector added later if needed
                 css_selector=None,
             )
             # Children IDs are strings from the JS map
             children_ids_str = node_data.get('children', [])
             # Basic validation
             if not isinstance(children_ids_str, list):
                 logger.warning(f"Invalid children format for node {node_data.get('xpath')}, expected list, got {type(children_ids_str)}. Treating as empty.")
                 children_ids_str = []
             return element_node, [str(cid) for cid in children_ids_str] # Ensure IDs are strings
        else:
             # Skip nodes that are neither TEXT_NODE nor have a tagName (e.g., comments processed out by JS)
             logger.debug(f"Skipping node data without 'type' or 'tagName': {str(node_data)[:100]}...")
             return None, []
    # Add the helper to generate enhanced CSS selectors (adapted from BrowserContext)
    # This could also live in a dedicated selector utility class/module
    @staticmethod
    def _enhanced_css_selector_for_element(element: DOMElementNode) -> str:
        """
        Generates a more robust CSS selector, prioritizing stable attributes.
        RECORDER FOCUS: Prioritize ID, data-testid, name, stable classes. Fallback carefully.
        """
        if not isinstance(element, DOMElementNode):
            return ''
        # Escape CSS identifiers (simple version, consider edge cases)
        def escape_css(value):
            if not value: return ''
            # Basic escape for characters that are problematic in unquoted identifiers/strings
            # See: https://developer.mozilla.org/en-US/docs/Web/CSS/string#escaping_characters
            # This is NOT exhaustive but covers common cases.
            return re.sub(r'([!"#$%&\'()*+,./:;<=>?@\[\\\]^`{|}~])', r'\\\1', value)
        # --- Attribute Priority Order ---
        # 1. ID (if reasonably unique-looking)
        if 'id' in element.attributes and element.attributes['id']:
            element_id = element.attributes['id'].strip()
            if element_id and not element_id.isdigit() and ' ' not in element_id and ':' not in element_id:
                 escaped_id = escape_css(element_id)
                 selector = f"#{escaped_id}"
                 # If ID seems generic, add tag name
                 if len(element_id) < 6 and element.tag_name not in ['div', 'span']: # Don't add for generic containers unless ID is short
                     return f"{element.tag_name}{selector}"
                 return selector
        # 2. Stable Data Attributes
        for test_attr in ['data-testid', 'data-test-id', 'data-cy', 'data-qa']:
            if test_attr in element.attributes and element.attributes[test_attr]:
                val = element.attributes[test_attr].strip()
                if val:
                     escaped_val = escape_css(val)
                     selector = f"[{test_attr}='{escaped_val}']"
                     # Add tag name if value seems generic
                     if len(val) < 5:
                         return f"{element.tag_name}{selector}"
                     return selector
        # 3. Name Attribute
        if 'name' in element.attributes and element.attributes['name']:
             name_val = element.attributes['name'].strip()
             if name_val:
                  escaped_name = escape_css(name_val)
                  selector = f"{element.tag_name}[name='{escaped_name}']"
                  return selector
        # 4. Aria-label
        if 'aria-label' in element.attributes and element.attributes['aria-label']:
             aria_label = element.attributes['aria-label'].strip()
             # Ensure label is reasonably specific (not just whitespace or very short)
             if aria_label and len(aria_label) > 2 and len(aria_label) < 80:
                  escaped_label = escape_css(aria_label)
                  selector = f"{element.tag_name}[aria-label='{escaped_label}']"
                  return selector
        # 5. Placeholder (for inputs)
        if element.tag_name == 'input' and 'placeholder' in element.attributes and element.attributes['placeholder']:
             placeholder = element.attributes['placeholder'].strip()
             if placeholder:
                  escaped_placeholder = escape_css(placeholder)
                  selector = f"input[placeholder='{escaped_placeholder}']"
                  return selector
        # --- Text Content Strategy (Use cautiously) ---
        # Get DIRECT, visible text content of the element itself
        direct_text = ""
        if element.is_visible: # Only consider text if element is visible
            texts = []
            for child in element.children:
                if isinstance(child, DOMTextNode) and child.is_visible:
                    texts.append(child.text.strip())
            direct_text = ' '.join(filter(None, texts)).strip()
        # 6. Specific Text Content (if short, unique-looking, and element type is suitable)
        suitable_text_tags = {'button', 'a', 'span', 'label', 'legend', 'h1', 'h2', 'h3', 'h4', 'p', 'li', 'td', 'th', 'dt', 'dd'}
        if direct_text and element.tag_name in suitable_text_tags and 2 < len(direct_text) < 60: # Avoid overly long or short text
             # Basic check for uniqueness (could be improved by checking siblings)
             # Check if it looks like dynamic content (e.g., numbers only, dates) - skip if so
             if not direct_text.isdigit() and not re.match(r'^\$?[\d,.]+$', direct_text): # Avoid pure numbers/prices
                # Use Playwright's text selector (escapes internally)
                # Note: This requires Playwright >= 1.15 or so for :text pseudo-class
                # Using :has-text is generally safer as it looks within descendants too,
                # but here we specifically want the *direct* text match.
                # Let's try combining tag and text for specificity.
                # Playwright handles quotes inside the text automatically.
                selector = f"{element.tag_name}:text-is('{direct_text}')"
                # Alternative: :text() - might be less strict about whitespace
                # selector = f"{element.tag_name}:text('{direct_text}')"
                # Let's try to validate this selector immediately if possible (costly)
                # For now, return it optimistically.
                return selector
        # --- Fallbacks (Structure and Class) ---
        base_selector = element.tag_name
        stable_classes_used = []
        # 7. Stable Class Names (Filter more strictly)
        if 'class' in element.attributes and element.attributes['class']:
            classes = element.attributes['class'].strip().split()
            stable_classes = [
                c for c in classes
                if c and not c.isdigit() and
                   not re.search(r'\d', c) and # No digits at all
                   not re.match(r'.*(--|__|is-|has-|js-|active|selected|disabled|hidden).*', c, re.IGNORECASE) and # Avoid common states/modifiers/js
                   not re.match(r'^[a-zA-Z]{1,2}$', c) and # Avoid 1-2 letter classes (often layout helpers)
                   len(c) > 2 and len(c) < 30 # Reasonable length
            ]
            if stable_classes:
                stable_classes.sort()
                stable_classes_used = stable_classes # Store for nth-of-type check
                base_selector += '.' + '.'.join(escape_css(c) for c in stable_classes)
        # --- Ancestor Context (Find nearest stable ancestor) ---
        # Try to find a parent with ID or data-testid to anchor the selector
        stable_ancestor_selector = None
        current = element.parent
        depth = 0
        max_depth = 4 # How far up to look for an anchor
        while current and depth < max_depth:
            ancestor_selector_part = None
            if 'id' in current.attributes and current.attributes['id']:
                 ancestor_id = current.attributes['id'].strip()
                 if ancestor_id and not ancestor_id.isdigit() and ' ' not in ancestor_id:
                      ancestor_selector_part = f"#{escape_css(ancestor_id)}"
            elif not ancestor_selector_part: # Check testid only if ID not found
                for test_attr in ['data-testid', 'data-test-id']:
                    if test_attr in current.attributes and current.attributes[test_attr]:
                        val = current.attributes[test_attr].strip()
                        if val:
                            ancestor_selector_part = f"[{test_attr}='{escape_css(val)}']"
                            break # Found one
            # If we found a stable part for the ancestor, use it
            if ancestor_selector_part:
                stable_ancestor_selector = ancestor_selector_part
                break # Stop searching up
            current = current.parent
            depth += 1
        # Combine ancestor and base selector if ancestor found
        final_selector = f"{stable_ancestor_selector} >> {base_selector}" if stable_ancestor_selector else base_selector
        # 8. Add :nth-of-type ONLY if multiple siblings match the current selector AND no unique attribute/text was found
        # This check becomes more complex with the ancestor path. We simplify here.
        # Only add nth-of-type if we didn't find a unique ID/testid/name/text for the element itself.
        needs_disambiguation = (stable_ancestor_selector is None) and \
                               (base_selector == element.tag_name or base_selector.startswith(element.tag_name + '.')) # Only tag or tag+class
        if needs_disambiguation and element.parent:
            try:
                # Find siblings matching the base selector part (tag + potentially classes)
                matching_siblings = []
                for sib in element.parent.children:
                    if isinstance(sib, DOMElementNode) and sib.tag_name == element.tag_name:
                        # Check classes if they were used in the base selector
                        if stable_classes_used:
                            if DomService._check_classes_match(sib, stable_classes_used):
                                matching_siblings.append(sib)
                        else: # No classes used, just match tag
                            matching_siblings.append(sib)
                if len(matching_siblings) > 1:
                     try:
                          index = matching_siblings.index(element) + 1
                          final_selector += f':nth-of-type({index})'
                     except ValueError:
                          logger.warning(f"Element not found in its own filtered sibling list for nth-of-type. Selector: {final_selector}")
            except Exception as e:
                logger.warning(f"Error during nth-of-type calculation: {e}. Selector: {final_selector}")
        # 9. FINAL FALLBACK: Use original XPath if selector is still not specific
        if final_selector == element.tag_name and element.xpath:
             logger.warning(f"Selector for {element.tag_name} is just the tag. Falling back to XPath: {element.xpath}")
             # Returning XPath directly might cause issues if executor expects CSS.
             # Playwright can handle css=<xpath>, so let's return that.
             return f"xpath={element.xpath}"
        return final_selector
    
    @staticmethod
    def _check_classes_match(element: DOMElementNode, required_classes: List[str]) -> bool:
        """Helper to check if an element has all the required classes."""
        if 'class' not in element.attributes or not element.attributes['class']:
            return False
        element_classes = set(element.attributes['class'].strip().split())
        return all(req_class in element_classes for req_class in required_classes)