Skip to main content
Glama

Web-QA

by GroundNG
recorder_agent.py183 kB
# /src/recorder_agent.py import json from importlib import resources import logging import time import re from patchright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError from typing import Dict, Any, Optional, List, Tuple, Union, Literal import random import os import threading # For timer from datetime import datetime, timezone from pydantic import BaseModel, Field from PIL import Image import io # Use relative imports within the package from ..browser.browser_controller import BrowserController from ..browser.panel.panel import Panel from ..llm.llm_client import LLMClient from ..core.task_manager import TaskManager from ..dom.views import DOMState, DOMElementNode, SelectorMap # Import DOM types # Configure logger logger = logging.getLogger(__name__) # --- Recorder Settings --- INTERACTIVE_TIMEOUT_SECS = 0 # Time for user to override AI suggestion DEFAULT_WAIT_AFTER_ACTION = 0.5 # Default small wait added after recorded actions # --- End Recorder Settings --- class PlanSubtasksSchema(BaseModel): """Schema for the planned subtasks list.""" planned_steps: List[str] = Field(..., description="List of planned test step descriptions as strings.") class LLMVerificationParamsSchema(BaseModel): """Schema for parameters within a successful verification.""" expected_text: Optional[str] = Field(None, description="Expected text for equals/contains assertions.") attribute_name: Optional[str] = Field(None, description="Attribute name for attribute_equals assertion.") expected_value: Optional[str] = Field(None, description="Expected value for attribute_equals assertion.") expected_count: Optional[int] = Field(None, description="Expected count for element_count assertion.") class LLMVerificationSchema(BaseModel): """Schema for the result of an LLM verification step.""" verified: bool = Field(..., description="True if the condition is met, False otherwise.") assertion_type: Optional[Literal[ 'assert_text_equals', 'assert_text_contains', 'assert_visible', 'assert_llm_verification', 'assert_hidden', 'assert_attribute_equals', 'assert_element_count', 'assert_checked', 'assert_enabled', 'assert_disabled', 'assert_not_checked' ]] = Field(None, description="Required if verified=true. Type of assertion suggested, reflecting the *actual observed state*.") element_index: Optional[int] = Field(None, description="Index of the *interactive* element [index] from context that might *also* relate to the verification (e.g., the button just clicked), if applicable. Set to null if verification relies solely on a static element or non-indexed element.") verification_selector: Optional[str] = Field(None, description="Final CSS selector for the verifying element (generated by the system based on index or static_id). LLM should output null for this field.") verification_static_id: Optional[str] = Field(None, description="Temporary ID (e.g., 's12') of the static element from context that confirms the verification, if applicable. Use this *instead* of verification_selector for static elements.") parameters: Optional[LLMVerificationParamsSchema] = Field(default_factory=dict, description="Parameters for the assertion based on the *actual observed state*. Required if assertion type needs params (e.g., assert_text_equals).") reasoning: str = Field(..., description="Explanation for the verification result, explaining how the intent is met or why it failed. If verified=true, justify the chosen selector and parameters.") class ReplanSchema(BaseModel): """Schema for recovery steps or abort action during re-planning.""" recovery_steps: Optional[List[str]] = Field(None, description="List of recovery step descriptions (1-3 steps), if recovery is possible.") action: Optional[Literal["abort"]] = Field(None, description="Set to 'abort' if recovery is not possible/safe.") reasoning: Optional[str] = Field(None, description="Reasoning, especially required if action is 'abort'.") class RecorderSuggestionParamsSchema(BaseModel): """Schema for parameters within a recorder action suggestion.""" index: Optional[int] = Field(None, description="Index of the target element from context (required for click/type/check/uncheck/key_press/drag_and_drop source).") keys: Optional[str] = Field(None, description="Key(s) to press (required for key_press action). E.g., 'Enter', 'Control+A'.") destination_index: Optional[int] = Field(None, description="Index of the target element for drag_and_drop action.") option_label: Optional[str] = Field(None, description="Visible text/label of the option to select (**required for select action**)") text: Optional[str] = Field(None, description="Text to type (required for type action).") class RecorderSuggestionSchema(BaseModel): """Schema for the AI's suggestion for a click/type action during recording.""" action: Literal["click", "type", "select", "check", "uncheck", "key_press", "drag_and_drop", "action_not_applicable", "suggestion_failed"] = Field(..., description="The suggested browser action or status.") parameters: RecorderSuggestionParamsSchema = Field(default_factory=dict, description="Parameters for the action (index, text, option_label).") reasoning: str = Field(..., description="Explanation for the suggestion.") class AssertionTargetIndexSchema(BaseModel): """Schema for identifying the target element index for a manual assertion.""" index: Optional[int] = Field(None, description="Index of the most relevant element from context, or null if none found/identifiable.") reasoning: Optional[str] = Field(None, description="Reasoning, especially if index is null.") class WebAgent: """ Orchestrates AI-assisted web test recording, generating reproducible test scripts. Can also function in a (now legacy) execution mode. """ def __init__(self, llm_client: LLMClient, headless: bool = True, # Note: Recorder mode forces non-headless max_iterations: int = 50, # Max planned steps to process in recorder max_history_length: int = 10, max_retries_per_subtask: int = 1, # Retries for *AI suggestion* or failed *execution* during recording max_extracted_data_history: int = 7, # Less relevant for recorder? Keep for now. is_recorder_mode: bool = False, automated_mode: bool = False, filename: str = "", baseline_dir: str = "./visual_baselines"): self.llm_client = llm_client self.is_recorder_mode = is_recorder_mode self.baseline_dir = os.path.abspath(baseline_dir) # Store baseline dir # Ensure baseline dir exists during initialization try: os.makedirs(self.baseline_dir, exist_ok=True) logger.info(f"Visual baseline directory set to: {self.baseline_dir}") except OSError as e: logger.warning(f"Could not create baseline directory '{self.baseline_dir}': {e}. Baseline saving might fail.") # Determine effective headless: Recorder forces non-headless unless automated effective_headless = headless if self.is_recorder_mode and not automated_mode: effective_headless = False # Interactive recording needs visible browser if headless: logger.warning("Interactive Recorder mode initiated, but headless=True was requested. Forcing headless=False.") elif automated_mode and not headless: logger.info("Automated mode running with visible browser (headless=False).") self.browser_controller = BrowserController(headless=effective_headless, auth_state_path='_'.join([filename, "auth_state.json"])) self.panel = Panel() # TaskManager manages the *planned* steps generated by LLM initially self.task_manager = TaskManager(max_retries_per_subtask=max_retries_per_subtask) self.history: List[Dict[str, Any]] = [] self.extracted_data_history: List[Dict[str, Any]] = [] # Keep for potential context, but less critical now self.max_iterations = max_iterations # Limit for planned steps processing self.max_history_length = max_history_length self.max_extracted_data_history = max_extracted_data_history self.output_file_path: Optional[str] = None # Path for the recorded JSON self.file_name = filename self.feature_description: Optional[str] = None self._latest_dom_state: Optional[DOMState] = None self._consecutive_suggestion_failures = 0 # Track failures for the *same* step index self._last_failed_step_index = -1 # Track which step index had the last failure self._last_static_id_map: Dict[str, 'DOMElementNode'] = {} # --- Recorder Specific State --- self.recorded_steps: List[Dict[str, Any]] = [] self._current_step_id = 1 # Counter for recorded steps self._user_abort_recording = False # --- End Recorder Specific State --- self.automated_mode = automated_mode # Log effective mode automation_status = "Automated" if self.automated_mode else "Interactive" logger.info(f"WebAgent (Recorder Mode / {automation_status}) initialized (headless={effective_headless}, max_planned_steps={max_iterations}, max_hist={max_history_length}, max_retries={max_retries_per_subtask}).") logger.info(f"Visual baseline directory: {self.baseline_dir}") def _add_to_history(self, entry_type: str, data: Any): """Adds an entry to the agent's history, maintaining max length.""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_data_str = "..." try: # Basic sanitization (same as before) if isinstance(data, dict): log_data = {k: (str(v)[:200] + '...' if len(str(v)) > 200 else v) for k, v in data.items()} elif isinstance(data, (str, bytes)): log_data = str(data[:297]) + "..." if len(data) > 300 else str(data) else: log_data = data log_data_str = str(log_data) if len(log_data_str) > 300: log_data_str = log_data_str[:297]+"..." except Exception as e: logger.warning(f"Error sanitizing history data: {e}") log_data = f"Error processing data: {e}" log_data_str = log_data entry = {"timestamp": timestamp, "type": entry_type, "data": log_data} self.history.append(entry) if len(self.history) > self.max_history_length: self.history.pop(0) logger.debug(f"[HISTORY] Add: {entry_type} - {log_data_str}") def _get_history_summary(self) -> str: """Provides a concise summary of the recent history for the LLM.""" if not self.history: return "No history yet." summary = "Recent History (Oldest First):\n" for entry in self.history: entry_data_str = str(entry['data']) if len(entry_data_str) > 300: entry_data_str = entry_data_str[:297] + "..." summary += f"- [{entry['type']}] {entry_data_str}\n" return summary.strip() def _clean_llm_response_to_json(self, llm_output: str) -> Optional[Dict[str, Any]]: """Attempts to extract and parse JSON from the LLM's output.""" logger.debug(f"[LLM PARSE] Attempting to parse LLM response (length: {len(llm_output)}).") match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", llm_output, re.DOTALL | re.IGNORECASE) if match: json_str = match.group(1).strip() logger.debug(f"[LLM PARSE] Extracted JSON from markdown block.") else: start_index = llm_output.find('{') end_index = llm_output.rfind('}') if start_index != -1 and end_index != -1 and end_index > start_index: json_str = llm_output[start_index : end_index + 1].strip() logger.debug(f"[LLM PARSE] Attempting to parse extracted JSON between {{ and }}.") else: logger.warning("[LLM PARSE] Could not find JSON structure in LLM output.") self._add_to_history("LLM Parse Error", {"reason": "No JSON structure found", "raw_output_snippet": llm_output[:200]}) return None # Pre-processing (same as before) try: def escape_quotes_replacer(match): key_part, colon_part, open_quote, value, close_quote = match.groups() escaped_value = re.sub(r'(?<!\\)"', r'\\"', value) return f'{key_part}{colon_part}{open_quote}{escaped_value}{close_quote}' keys_to_escape = ["selector", "text", "reasoning", "url", "result", "answer", "reason", "file_path", "expected_text", "attribute_name", "expected_value"] pattern_str = r'(\"(?:' + '|'.join(keys_to_escape) + r')\")(\s*:\s*)(\")(.*?)(\")' pattern = re.compile(pattern_str, re.DOTALL) json_str = pattern.sub(escape_quotes_replacer, json_str) json_str = json_str.replace('\\\\n', '\\n').replace('\\n', '\n') json_str = json_str.replace('\\\\"', '\\"') json_str = json_str.replace('\\\\t', '\\t') json_str = re.sub(r',\s*([\}\]])', r'\1', json_str) except Exception as clean_e: logger.warning(f"[LLM PARSE] Error during pre-parsing cleaning: {clean_e}") # Attempt Parsing (check for 'action' primarily, parameters might be optional for some recorder actions) try: parsed_json = json.loads(json_str) if isinstance(parsed_json, dict) and "action" in parsed_json: # Parameters might not always be present (e.g., simple scroll) if "parameters" not in parsed_json: parsed_json["parameters"] = {} # Ensure parameters key exists logger.debug(f"[LLM PARSE] Successfully parsed action JSON: {parsed_json}") return parsed_json else: logger.warning(f"[LLM PARSE] Parsed JSON missing 'action' key or is not a dict: {parsed_json}") self._add_to_history("LLM Parse Error", {"reason": "Missing 'action' key", "parsed_json": parsed_json, "cleaned_json_string": json_str[:200]}) return None except json.JSONDecodeError as e: logger.error(f"[LLM PARSE] Failed to decode JSON from LLM output: {e}") logger.error(f"[LLM PARSE] Faulty JSON string snippet (around pos {e.pos}): {json_str[max(0, e.pos-50):e.pos+50]}") self._add_to_history("LLM Parse Error", {"reason": f"JSONDecodeError: {e}", "error_pos": e.pos, "json_string_snippet": json_str[max(0, e.pos-50):e.pos+50]}) return None except Exception as e: logger.error(f"[LLM PARSE] Unexpected error during final JSON parsing: {e}", exc_info=True) return None def _plan_subtasks(self, feature_description: str): """Uses the LLM to break down the feature test into planned steps using generate_json.""" logger.info(f"Planning test steps for feature: '{feature_description}'") self.feature_description = feature_description # --- Prompt Construction (Adjusted for generate_json) --- prompt = f""" You are an AI Test Engineer planning steps for recording. Given the feature description: "{feature_description}" Break this down into a sequence of specific browser actions or verification checks. Each step should be a single instruction (e.g., "Navigate to...", "Click the 'Submit' button", "Type 'testuser' into username field", "Verify text 'Success' is visible"). The recorder agent will handle identifying elements and generating selectors based on these descriptions. **Key Types of Steps to Plan:** 1. **Navigation:** `Navigate to https://example.com/login` 2. **Action:** `Click element 'Submit Button'` or `Type 'testuser' into element 'Username Input'` or `Check 'male' radio button or Check 'Agree to terms & conditions'` or `Uncheck the 'subscribe to newsletter' checkbox` (Describe the element clearly). **IMPORTANT for Dropdowns (<select>):** If the task involves selecting an option (e.g., "Select 'Canada' from the 'Country' dropdown"), generate a **SINGLE step** like: `Select option 'Canada' in element 'Country Dropdown'` (Describe the main `<select>` element and the option's visible text/label). 3. **Key Press:** `Press 'Enter' key on element 'Search Input'`, `Press 'Tab' key`. (Specify the element if the press is targeted, otherwise it might be global). 4. **Drag and Drop:** `Drag element 'Item A' onto element 'Cart Area'`. (Clearly describe source and target). 5. **Wait:** `Wait for 5 seconds` 6. **Verification:** Phrase as a check. The recorder will prompt for specifics. - `Verify 'Login Successful' message is present` - `Verify 'Cart Count' shows 1` - `Verify 'Submit' button is disabled` - **GOOD:** `Verify login success indicator is visible` (More general) - **AVOID:** `Verify text 'Welcome John Doe!' is visible` (Too specific if name changes) 7. **Scrolling:** `Scroll down` (if content might be off-screen) 8. **Visual Baseline Capture:** If the feature description implies capturing a visual snapshot at a key state (e.g., after login, after adding to cart), use: `Visually baseline the [short description of state]`. Examples: `Visually baseline the login page`, `Visually baseline the dashboard after login`, `Visually baseline the product details page`. **CRITICAL:** Focus on the *intent* of each step. Do NOT include specific selectors or indices in the plan. The recorder determines those interactively. **Output Format:** Respond with a JSON object conforming to the following structure: {{ "planned_steps": ["Step 1 description", "Step 2 description", ...] }} Example Test Case: "Test login on example.com with user 'tester' and pass 'pwd123', then verify the welcome message 'Welcome, tester!' is shown." Example JSON Output Structure: {{ "planned_steps": [ "Navigate to https://example.com/login", "Type 'tester' into element 'username input field'", "Type 'pwd123' into element 'password input field'", "Click element 'login button'", "Verify 'Welcome, tester!' message is present" "Visually baseline the user dashboard" ] }} Now, generate the JSON object containing the planned steps for: "{feature_description}" """ # --- End Prompt --- logger.debug(f"[TEST PLAN] Sending Planning Prompt (snippet):\n{prompt[:500]}...") response_obj = self.llm_client.generate_json(PlanSubtasksSchema, prompt) subtasks = None raw_response_for_history = "N/A (Used generate_json)" if isinstance(response_obj, PlanSubtasksSchema): logger.debug(f"[TEST PLAN] LLM JSON response parsed successfully: {response_obj}") # Validate the parsed list if isinstance(response_obj.planned_steps, list) and all(isinstance(s, str) and s for s in response_obj.planned_steps): subtasks = response_obj.planned_steps logger.info(f"Subtasks: {subtasks}") else: logger.warning(f"[TEST PLAN] Parsed JSON planned_steps is not a list of non-empty strings: {response_obj.planned_steps}") raw_response_for_history = f"Parsed object invalid content: {response_obj}" # Log the invalid object elif isinstance(response_obj, str): # Handle error string from generate_json logger.error(f"[TEST PLAN] Failed to generate/parse planned steps JSON from LLM: {response_obj}") raw_response_for_history = response_obj[:500]+"..." else: # Handle unexpected return type logger.error(f"[TEST PLAN] Unexpected response type from generate_json: {type(response_obj)}") raw_response_for_history = f"Unexpected type: {type(response_obj)}" # --- Update Task Manager --- if subtasks and len(subtasks) > 0: self.task_manager.add_subtasks(subtasks) # TaskManager stores the *planned* steps self._add_to_history("Test Plan Created", {"feature": feature_description, "steps": subtasks}) logger.info(f"Successfully planned {len(subtasks)} test steps.") logger.debug(f"[TEST PLAN] Planned Steps: {subtasks}") else: logger.error("[TEST PLAN] Failed to generate or parse valid planned steps from LLM response.") # Use the captured raw_response_for_history which contains error details self._add_to_history("Test Plan Failed", {"feature": feature_description, "raw_response": raw_response_for_history}) raise ValueError("Failed to generate a valid test plan from the feature description.") def _save_visual_baseline(self, baseline_id: str, screenshot_bytes: bytes, selector: Optional[str] = None) -> bool: """Saves the screenshot and metadata for a visual baseline.""" if not screenshot_bytes: logger.error(f"Cannot save baseline '{baseline_id}', no screenshot bytes provided.") return False image_path = os.path.join(self.baseline_dir, f"{baseline_id}.png") metadata_path = os.path.join(self.baseline_dir, f"{baseline_id}.json") # --- Prevent Overwrite (Optional - Prompt user or fail) --- if os.path.exists(image_path) or os.path.exists(metadata_path): if self.automated_mode: logger.warning(f"Baseline '{baseline_id}' already exists. Overwriting in automated mode.") # Allow overwrite in automated mode else: # Interactive mode overwrite = input(f"Baseline '{baseline_id}' already exists. Overwrite? (y/N) > ").strip().lower() if overwrite != 'y': logger.warning(f"Skipping baseline save for '{baseline_id}' - user chose not to overwrite.") return False # Indicate skipped save # --- End Overwrite Check --- try: # 1. Save Image img = Image.open(io.BytesIO(screenshot_bytes)) img.save(image_path, format='PNG') logger.info(f"Saved baseline image to: {image_path}") # 2. Gather Metadata current_url = self.browser_controller.get_current_url() viewport_size = self.browser_controller.get_viewport_size() browser_info = self.browser_controller.get_browser_version() os_info = self.browser_controller.get_os_info() timestamp = datetime.now(timezone.utc).isoformat() metadata = { "baseline_id": baseline_id, "image_file": os.path.basename(image_path), # Store relative path "created_at": timestamp, "updated_at": timestamp, # Same initially "url_captured": current_url, "viewport_size": viewport_size, "browser_info": browser_info, "os_info": os_info, "selector_captured": selector # Store selector if it was an element capture } # 3. Save Metadata with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) logger.info(f"Saved baseline metadata to: {metadata_path}") return True # Success except Exception as e: logger.error(f"Error saving baseline '{baseline_id}' (Image: {image_path}, Meta: {metadata_path}): {e}", exc_info=True) # Clean up potentially partially saved files if os.path.exists(image_path): os.remove(image_path) if os.path.exists(metadata_path): os.remove(metadata_path) return False # Failure def _get_llm_verification(self, verification_description: str, current_url: str, dom_context_str: str, static_id_map: Dict[str, Any], screenshot_bytes: Optional[bytes] = None, previous_error: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Uses LLM's generate_json (potentially multimodal) to verify if a condition is met. Returns a dictionary representation of the result or None on error. """ logger.info(f"Requesting LLM verification (using generate_json) for: '{verification_description}'") logger.info(f"""dom_context_str: {dom_context_str} """) # --- Prompt Adjustment for generate_json --- prompt = f""" You are an AI Test Verification Assistant. Your task is to determine if a specific condition, **or its clear intent**, is met based on the current web page state and If the goal IS met, propose the **most robust and specific, deterministic assertion** possible to confirm this state. **Overall Goal:** {self.feature_description} **Verification Step:** {verification_description} **Current URL:** {current_url} {f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else ""} **Input Context (Visible Elements with Indices for Interactive ones):** This section shows visible elements on the page. - Interactive elements are marked with `[index]` (e.g., `[5]<button>Submit</button>`). - Static elements crucial for context are marked with `(Static)` (e.g., `<p (Static)>Login Successful!</p>`). - Some plain static elements may include a hint about their parent, like `(inside: <div id="summary">)`, to help locate them. - Some may be not visible but are interactive, assertable ```html {dom_context_str} ``` {f"**Screenshot Analysis:** Please analyze the attached screenshot for visual confirmation or contradiction of the verification step." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."} **Your Task:** 1. Analyze the provided context (DOM, URL, and screenshot if provided). 2. Determine if the **intent** behind the "Verification Step" is currently TRUE or FALSE. * Example: If the step is "Verify 'Login Complete'", but the page shows "Welcome, User!", the *intent* IS met. 3. Respond with a JSON object matching the required schema. * Set the `verified` field (boolean). * Provide detailed `reasoning` (string), explaining *how* the intent is met or why it failed. * **If `verified` is TRUE:** * Identify the **single most relevant element** (interactive OR static) in the context that **specifically confirms the successful outcome** of the preceding action(s). * If confirmed by an **interactive element `[index]`**: Set `element_index` to that index. `verification_static_id` should be null. * If confirmed by a **static element shown with `data-static-id="sXXX"`**: Set `verification_static_id` to that ID string (e.g., "s15"). `element_index` should be null. * **`verification_selector` (Set to null by you):** The system will generate the final CSS selector based on the provided index or static ID. Leave this field as `null`. * **`assertion_type` (Required):** Propose a specific, deterministic assertion. Determine the most appropriate assertion type based on the **actual observed state** and the verification intent. **Prefer `assert_text_contains` for text verification unless the step demands an *exact* match.** * Use `assert_llm_verification` for cases where vision LLMs will be better than any other selector for this problem. Visual UI stuff like overflow, truncation, overlap, positioning all this can't be determined via normal playwright automation. You need llm vision verification for such stuff. * Use `assert_text_equals` / `assert_text_contains` for text content. Prefer this over visible unless text doesn't confirm the verification. Note to use the **EXACT TEXT INCLUDING ANY PUNCTUATION** * Use `assert_checked` if the intent is to verify a checkbox or radio button **is currently selected/checked**. * Use `assert_not_checked` if the intent is to verify it **is NOT selected/checked**. * Use `assert_visible` / `assert_hidden` for visibility states. * Use `assert_disabled` / `assert_enabled` for checking disabled states * Use `assert_attribute_equals` ONLY for comparing the *string value* of an attribute (e.g., `class="active"`, `value="Completed"`). **DO NOT use it for boolean attributes like `checked`, `disabled`, `selected`. Use state assertions instead.** * Use `assert_element_count` for counting elements matching a selector. **Note that you can't count child elements by using static id of parent elements. You need to use the selector for the elements you need to count** * **`parameters` (Optional):** Provide necessary parameters ONLY if the chosen `assertion_type` requires them (e.g., `assert_text_equals` needs `expected_text`). For `assert_checked`, `assert_not_checked`, `assert_visible`, `assert_hidden`, `assert_disabled`, `assert_enabled`, parameters should generally be empty (`{{}}`) or omitted. Ensure parameters reflect the *actual observed state* (e.g., observed text). * **Guidance for Robustness:** * **Prefer specific checks:** `assert_text_contains` on a specific message is better than just `assert_visible` on a generic container *if* the text confirms the verification goal. * **Dynamic Content:** For content that loads (e.g., images replacing placeholders, data appearing), prefer assertions on **stable containers** (`assert_element_count`, `assert_visible` on the container) or the **final loaded state** if reliably identifiable, rather than the transient loading state (like placeholder text). * **Element Targeting:** Identify the **single most relevant element** (interactive `[index]` OR static `data-static-id="sXXX"`) that **proves** the assertion. Set `element_index` OR `verification_static_id` accordingly. * **`verification_selector` (Set to null):** The system generates the final selector. Leave this null. * **`parameters` (Required if needed):** Provide necessary parameters based on the chosen `assertion_type` and the **actual observed state** (e.g., the exact text seen for `assert_text_contains`). Empty `{{}}` if no parameters needed (e.g., `assert_visible`). * **If `verified` is FALSE:** * `assertion_type`, `element_index`, `verification_selector`, `parameters` should typically be null/omitted. **JSON Output Structure Examples:** *Success Case (Static Element Confirms):* ```json {{ "verified": true, "assertion_type": "assert_text_contains", // Preferred over just visible "element_index": null, "verification_static_id": "s23", // ID from context "verification_selector": null, "parameters": {{ "expected_text": "logged in!" }}, // Actual text observed "reasoning": "The static element <p data-static-id='s23'> shows 'logged in!', fulfilling the verification step's goal." }} ```json {{ "verified": true, "assertion_type": "assert_element_count", // Verifying 5 items loaded "element_index": null, // Index not needed if counting multiple matches "verification_static_id": "s50", // Target the container element "verification_selector": null, // System will generate selector for container s50 "parameters": {{ "expected_count": 5 }}, "reasoning": "The list container <ul data-static-id='s50'> visually contains 5 items, confirming the verification requirement." }} ``` ```json {{ "verified": true, "assertion_type": "assert_llm_verification", "element_index": null, "verification_static_id": null, "verification_selector": null, "parameters": {{ }}, "reasoning": "Checking if text is overflowing is best suited for a vision assisted llm verfication" }} ``` ``` *Success Case (Radio Button Checked):* ```json {{ "verified": true, "assertion_type": "assert_checked", "element_index": 9, // <-- LLM provides interactive index "verification_static_id": null, "verification_selector": null, // <-- LLM sets to null "parameters": {{}}, "reasoning": "The 'Credit Card' radio button [9] is selected on the page, fulfilling the verification requirement." }} ``` *Success Case (Checkbox Not Checked - Interactive):* ```json {{ "verified": true, "assertion_type": "assert_not_checked", "element_index": 11, // <-- LLM provides interactive index "verification_static_id": null, "verification_selector": null, // <-- LLM sets to null "parameters": {{}}, "reasoning": "The 'Subscribe' checkbox [11] is not checked, as required by the verification step." }} ``` *Success Case (Interactive Element Confirms - Visible):* ```json {{ "verified": true, "assertion_type": "assert_visible", "element_index": 8, // <-- LLM provides interactive index "verification_static_id": null, "verification_selector": null, // <-- LLM sets to null "parameters": {{}}, "reasoning": "Element [8] (logout button) is visible, confirming the user is logged in as per the verification step intent." }} ``` *Success Case (Attribute on Static Element):* ```json {{ "verified": true, "assertion_type": "assert_attribute_equals", "element_index": null, "verification_static_id": "s45", // <-- LLM provides static ID "verification_selector": null, // <-- LLM sets to null "parameters": {{ "attribute_name": "data-status", "expected_value": "active" }}, "reasoning": "The static <div data-static-id='s45' data-status='active' (Static)> element has the 'data-status' attribute set to 'active', confirming the verification requirement." }} ``` *Failure Case:* ```json {{ "verified": false, "assertion_type": null, "element_index": null, "verification_static_id": null, "verification_selector": null, "parameters": {{}}, "reasoning": "Could not find the 'Success' message or any other indication of successful login in the provided context or screenshot." }} ``` **CRITICAL:** If `verified` is true, provide *either* `element_index` for interactive elements OR `verification_static_id` for static elements. **Do not generate the `verification_selector` yourself; set it to `null`.** Explain any discrepancies between the plan and the observed state in `reasoning`. Respond ONLY with the JSON object matching the schema. Now, generate the verification JSON for: "{verification_description}" """ if previous_error: err = f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else "" logger.critical(err) # Call generate_json, passing image_bytes if available logger.debug("[LLM VERIFY] Sending prompt (and potentially image) to generate_json...") response_obj = self.llm_client.generate_json( LLMVerificationSchema, prompt, image_bytes=screenshot_bytes # Pass the image bytes here ) verification_json = None # Initialize if isinstance(response_obj, LLMVerificationSchema): logger.debug(f"[LLM VERIFY] Successfully parsed response: {response_obj}") verification_dict = response_obj.model_dump(exclude_none=True) assertion_type = verification_dict.get("assertion_type") params = verification_dict.get("parameters", {}) needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count'] no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_disabled', 'assert_enabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification'] # --- Post-hoc Validation --- is_verified_by_llm = verification_dict.get("verified") if is_verified_by_llm is None: logger.error("[LLM VERIFY FAILED] Parsed JSON missing required 'verified' field.") self._add_to_history("LLM Verification Error", {"reason": "Missing 'verified' field", "parsed_dict": verification_dict}) return None if not verification_dict.get("reasoning"): logger.warning(f"[LLM VERIFY] Missing 'reasoning' in response: {verification_dict}") verification_dict["reasoning"] = "No reasoning provided by LLM." # --- Selector Generation Logic --- final_selector = None # Initialize selector to be potentially generated if is_verified_by_llm: static_id = verification_dict.get("verification_static_id") interactive_index = verification_dict.get("element_index") # Keep original name from schema if static_id: # --- Static element identified by ID --- target_node = static_id_map.get(static_id) if target_node: logger.info(f"LLM identified static element via ID: {static_id}. Generating selector...") # Import or access DomService appropriately here from ..dom.service import DomService try: # Generate selector using Python logic final_selector = DomService._enhanced_css_selector_for_element(target_node) if not final_selector: logger.error(f"Failed to generate selector for static node (ID: {static_id}). Falling back to XPath.") final_selector = f"xpath={target_node.xpath}" # Fallback verification_dict["verification_selector"] = final_selector verification_dict["_static_id_used"] = static_id verification_dict["element_index"] = None # Ensure index is None for static logger.info(f"Generated selector for static ID {static_id}: {final_selector}") except Exception as gen_err: logger.error(f"Error generating selector for static ID {static_id}: {gen_err}") # Decide how to handle: fail verification? Use XPath? verification_dict["verification_selector"] = f"xpath={target_node.xpath}" # Fallback verification_dict["element_index"] = None verification_dict["reasoning"] += f" [Selector generation failed: {gen_err}]" else: logger.error(f"LLM returned static ID '{static_id}' but it wasn't found in the context map!") # Mark as failed or handle error appropriately verification_dict["verified"] = False verification_dict["reasoning"] = f"Verification failed: LLM hallucinated static ID '{static_id}'." # Clear fields that shouldn't be present on failure verification_dict.pop("assertion_type", None) verification_dict.pop("verification_static_id", None) verification_dict.pop("parameters", None) elif interactive_index is not None: # --- Interactive element identified by index --- if self._latest_dom_state and self._latest_dom_state.selector_map: target_node = self._latest_dom_state.selector_map.get(interactive_index) if target_node and target_node.css_selector: final_selector = target_node.css_selector verification_dict["verification_selector"] = final_selector # verification_dict["element_index"] is already set logger.info(f"Using pre-generated selector for interactive index {interactive_index}: {final_selector}") else: logger.error(f"LLM returned interactive index [{interactive_index}] but node or selector not found in map!") verification_dict["verified"] = False verification_dict["reasoning"] = f"Verification failed: LLM hallucinated interactive index '{interactive_index}' or selector missing." verification_dict.pop("assertion_type", None) verification_dict.pop("element_index", None) verification_dict.pop("parameters", None) else: logger.error("Cannot resolve interactive index: DOM state or selector map missing.") verification_dict["verified"] = False verification_dict["reasoning"] = "Verification failed: Internal error retrieving DOM state for interactive index." verification_dict.pop("assertion_type", None) verification_dict.pop("element_index", None) verification_dict.pop("parameters", None) else: # Verified = true, but LLM provided neither static ID nor interactive index logger.error("LLM verification PASSED but provided neither static ID nor interactive index!") # verification_dict["verified"] = False verification_dict["reasoning"] = "Verified to be true by using vision LLMs" verification_dict.pop("assertion_type", None) verification_dict.pop("parameters", None) if final_selector and target_node and self.browser_controller.page: try: handles = self.browser_controller.page.query_selector_all(final_selector) num_matches = len(handles) validation_passed = False if num_matches == 1: # Get XPath of the element found by the generated selector try: with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path: js_code = js_path.read_text(encoding='utf-8') logger.debug("xpathgenerator.js loaded successfully.") except FileNotFoundError: logger.error("xpathgenerator.js not found in the 'agents' package directory!") raise except Exception as e: logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True) raise # Pass a *function string* to evaluate. Playwright passes the handle as 'element'. # The function string first defines our helper, then calls it. script_to_run = f""" (element) => {{ {js_code} // Define the helper function(s) return generateXPathForElement(element); // Call it }} """ # Use page.evaluate, passing the script string and the element handle matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0]) # Compare XPaths (simplest reliable comparison) if target_node.xpath == matched_xpath: validation_passed = True logger.info(f"Validation PASSED: Selector '{final_selector}' uniquely matches target node.") else: logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').") elif num_matches == 0: logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 0 elements.") else: # num_matches > 1 logger.warning(f"Validation FAILED: Selector '{final_selector}' matched {num_matches} elements (not unique).") # --- Fallback to XPath if validation failed --- if not validation_passed: logger.warning(f"Falling back to XPath selector for target node.") original_selector = final_selector # Keep for logging/reasoning final_selector = f"xpath={target_node.xpath}" # Update reasoning if possible if "reasoning" in verification_dict: verification_dict["reasoning"] += f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]" # Update the selector in the dictionary being built verification_dict["verification_selector"] = final_selector except Exception as validation_err: logger.error(f"Error during selector validation ('{final_selector}'): {validation_err}. Falling back to XPath.") original_selector = final_selector final_selector = f"xpath={target_node.xpath}" if "reasoning" in verification_dict: verification_dict["reasoning"] += f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]" verification_dict["verification_selector"] = final_selector # --- Post-hoc validation (same as before, applied to final verification_dict) --- if verification_dict.get("verified"): assertion_type = verification_dict.get("assertion_type") params = verification_dict.get("parameters", {}) needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count'] no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_enabled', 'assert_disabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification'] if not verification_dict.get("verification_selector"): logger.error("Internal Error: Verification marked passed but final selector is missing!") # verification_dict["verified"] = False verification_dict["reasoning"] = "Verified to be true by using vision LLMs" elif needs_params and not params: logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs parameters, but none provided: {verification_dict}") elif no_params_needed and params: logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs no parameters, but some provided: {params}. Using empty params.") verification_dict["parameters"] = {} # Assign the potentially modified dictionary verification_json = verification_dict elif isinstance(response_obj, str): # Handle error string logger.error(f"[LLM VERIFY FAILED] LLM returned an error string: {response_obj}") self._add_to_history("LLM Verification Failed", {"raw_error_response": response_obj}) return None else: # Handle unexpected type logger.error(f"[LLM VERIFY FAILED] Unexpected response type from generate_json: {type(response_obj)}") self._add_to_history("LLM Verification Failed", {"response_type": str(type(response_obj))}) return None if verification_json: logger.info(f"[LLM VERIFY RESULT] Verified: {verification_json['verified']}, Selector: {verification_json.get('verification_selector')}, Reasoning: {verification_json.get('reasoning', '')[:150]}...") self._add_to_history("LLM Verification Result", verification_json) return verification_json # Return the dictionary else: logger.error("[LLM VERIFY FAILED] Reached end without valid verification_json.") return None def _handle_llm_verification(self, planned_step: Dict[str, Any], verification_result: Dict[str, Any]) -> bool: """ Handles the user interaction and recording after LLM verification. Now uses verification_selector as the primary target. Returns True if handled (recorded/skipped), False if aborted. """ planned_desc = planned_step["description"] is_verified_by_llm = verification_result['verified'] reasoning = verification_result.get('reasoning', 'N/A') step_handled = False # Flag to track if a decision was made parameter_name = None if is_verified_by_llm: final_selector = verification_result.get("verification_selector") assertion_type = verification_result.get("assertion_type") parameters = verification_result.get("parameters", {}) interactive_index = verification_result.get("element_index") # Optional index hint static_id = verification_result.get("_static_id_used") # Optional static ID hint # --- Perform Quick Validation using Playwright --- validation_passed = False validation_error = "Validation prerequisites not met (missing type/selector, unless assert_llm_verification)." # Only validate if type and selector are present (or if type is assert_llm_verification) if assertion_type and (final_selector or assertion_type == 'assert_llm_verification' or assertion_type == 'assert_passed_verification'): logger.info("Performing quick Playwright validation of LLM's suggested assertion...") validation_passed, validation_error = self.browser_controller.validate_assertion( assertion_type, final_selector, parameters ) elif not assertion_type: validation_passed = True assertion_type = "assert_llm_verification" # NOTE THAT THIS IS CURRENTLY TREATED AS VERIFIED BY VISION LLM # (selector check is handled inside validate_assertion) if not validation_passed: # --- Validation FAILED --- logger.warning(f"LLM assertion validation FAILED for step '{planned_desc}': {validation_error}") error_message_for_task = f"LLM assertion validation failed: {validation_error}. Original AI reasoning: {reasoning}" # Mark task as failed so the main loop can handle retry/re-planning, passing the validation error self.task_manager.update_subtask_status( self.task_manager.current_subtask_index, "failed", error=error_message_for_task, force_update=True # Ensure status changes ) # No UI shown here, let main loop retry and pass error back to _get_llm_verification return True # Indicate step was handled (by failing validation) # --- Validation PASSED --- logger.info("LLM assertion validation PASSED.") # Proceed with highlighting, panel display (interactive), or direct recording (automated) # --- Automated Mode (Validated Success) --- if self.automated_mode: logger.info(f"[Auto Mode] Handling Validated LLM Verification for: '{planned_desc}'") logger.info(f"[Auto Mode] AI Result: PASSED. Validated Assertion: {assertion_type} on {final_selector}") highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct highlight_text = "Verify Target (Static)" if static_id else "Verify Target" highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0) self.browser_controller.clear_highlights() try: # Find node XPath if possible for better highlighting fallback target_node_xpath = None target_node = None if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map: target_node = self._last_static_id_map.get(static_id) elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map: target_node = self._latest_dom_state.selector_map.get(interactive_index) if target_node: target_node_xpath = target_node.xpath self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath) except Exception as hl_err: logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}") print(f"AI suggests assertion on element: {final_selector} (Highlight failed)") # Record the validated assertion automatically logger.info(f"[Auto Mode] Recording validated assertion: {assertion_type} on {final_selector}") record = { "step_id": self._current_step_id, "action": assertion_type, "description": planned_desc, "parameters": parameters, "selector": final_selector, "wait_after_secs": 0 } self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated - Automated)") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion (automated) as step {record['step_id']}") step_handled = True self.browser_controller.clear_highlights() # Clear highlights even in auto mode return True # Indicate handled # --- Interactive Mode (Validated Success) --- else: print("\n" + "="*60) print(f"Planned Step: {planned_desc}") print(f"Review AI Verification Result (Validated) in Panel...") # Highlight element final_selector = verification_result.get("verification_selector") # Re-fetch just in case interactive_index = verification_result.get("element_index") static_id = verification_result.get("_static_id_used") highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct highlight_text = "Verify Target (Static)" if static_id else "Verify Target" highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0) self.browser_controller.clear_highlights() try: target_node_xpath = None target_node = None if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map: target_node = self._last_static_id_map.get(static_id) elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map: target_node = self._latest_dom_state.selector_map.get(interactive_index) if target_node: target_node_xpath = target_node.xpath self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath) print(f"AI suggests assertion on element (Validation PASSED): {final_selector}") except Exception as hl_err: logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}") print(f"AI suggests assertion on element (Validation PASSED): {final_selector} (Highlight failed)") print("="*60) # Show the review panel (button should be enabled now) self.panel.show_verification_review_panel(planned_desc, verification_result) # Wait for user choice from panel user_choice = self.panel.wait_for_panel_interaction(30.0) # Give more time for review if not user_choice: user_choice = 'skip' # Default to skip on timeout # --- Process User Choice --- if user_choice == 'record_ai': # Record validated assertion final_selector = verification_result.get("verification_selector") assertion_type = verification_result.get("assertion_type") parameters = verification_result.get("parameters", {}) print(f"Recording validated AI assertion: {assertion_type} on {final_selector}") record = { "step_id": self._current_step_id, "action": assertion_type, "description": planned_desc, "parameters": parameters, "selector": final_selector, "wait_after_secs": 0 } self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated)") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion as step {record['step_id']}") step_handled = True elif user_choice == 'define_manual': print("Switching to manual assertion definition...") self.panel.hide_recorder_panel() # Hide review panel # Call the existing manual handler if not self._handle_assertion_recording(planned_step): self._user_abort_recording = True # Propagate abort signal step_handled = True # Manual path handles the step elif user_choice == 'skip': print("Skipping verification step.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped AI verification via panel") step_handled = True elif user_choice == 'abort': print("Aborting recording.") self._user_abort_recording = True step_handled = False # Abort signal else: # Includes timeout, None, unexpected values print("Invalid choice or timeout during verification review. Skipping step.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid choice/timeout on AI verification") step_handled = True # --- Cleanup UI --- self.browser_controller.clear_highlights() self.panel.hide_recorder_panel() return step_handled and not self._user_abort_recording else: # --- LLM verification FAILED initially --- logger.warning(f"[Verification] AI verification FAILED for '{planned_desc}'. Reasoning: {reasoning}") # Mode-dependent handling if self.automated_mode: logger.warning("[Auto Mode] Recording AI verification failure.") failed_record = { "step_id": self._current_step_id, "action": "assert_failed_verification", # Specific action type "description": planned_desc, "parameters": {"reasoning": reasoning}, "selector": None, "wait_after_secs": 0 } self.recorded_steps.append(failed_record) self._current_step_id += 1 logger.info(f"Step {failed_record['step_id']} recorded (AI Verification FAILED - Automated)") self.task_manager.update_subtask_status( self.task_manager.current_subtask_index, "failed", # Mark as skipped result=f"AI verification failed: {reasoning}. Recorded as failed assertion.", force_update=True ) step_handled = True else: # Interactive Mode - Fallback to Manual print("\n" + "="*60) print(f"Planned Step: {planned_desc}") print(f"AI Verification Result: FAILED (Reason: {reasoning})") print("Falling back to manual assertion definition...") print("="*60) self.browser_controller.clear_highlights() self.panel.hide_recorder_panel() # Ensure review panel isn't shown # Call the existing manual handler if not self._handle_assertion_recording(planned_step): self._user_abort_recording = True # Propagate abort step_handled = True # Manual path handles the step return step_handled and not self._user_abort_recording def _trigger_re_planning(self, current_planned_task: Dict[str, Any], reason: str) -> bool: """ Attempts to get recovery steps from the LLM (using generate_json, potentially multimodal) when an unexpected state is detected. Returns True if recovery steps were inserted, False otherwise (or if abort requested). """ logger.warning(f"Triggering re-planning due to: {reason}") self._add_to_history("Re-planning Triggered", {"reason": reason, "failed_step_desc": current_planned_task['description']}) if self.automated_mode: print = lambda *args, **kwargs: logger.info(f"[Auto Mode Replanning] {' '.join(map(str, args))}") # Redirect print input = lambda *args, **kwargs: 'y' # Default to accepting suggestions in auto mode else: print("\n" + "*"*60) print("! Unexpected State Detected !") print(f"Reason: {reason}") print(f"Original Goal: {self.feature_description}") print(f"Attempting Step: {current_planned_task['description']}") print("Asking AI for recovery suggestions...") print("*"*60) # --- Gather Context for Re-planning --- current_url = "Error getting URL" dom_context_str = "Error getting DOM" screenshot_bytes = None # Initialize try: current_url = self.browser_controller.get_current_url() if self._latest_dom_state and self._latest_dom_state.element_tree: dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification') screenshot_bytes = self.browser_controller.take_screenshot() except Exception as e: logger.error(f"Error gathering context for re-planning: {e}") original_plan_str = "\n".join([f"- {t['description']}" for t in self.task_manager.subtasks]) history_summary = self._get_history_summary() last_done_step_desc = "N/A (Start of test)" for i in range(current_planned_task['index'] -1, -1, -1): if self.task_manager.subtasks[i]['status'] == 'done': last_done_step_desc = self.task_manager.subtasks[i]['description'] break # --- Construct Re-planning Prompt (Adjusted for generate_json with image) --- prompt = f""" You are an AI Test Recorder Assistant helping recover from an unexpected state during test recording. **Overall Goal:** {self.feature_description} **Original Planned Steps:** {original_plan_str} **Current Situation:** - Last successfully completed planned step: '{last_done_step_desc}' - Currently trying to execute planned step: '{current_planned_task['description']}' (Attempt {current_planned_task['attempts']}) - Encountered unexpected state/error: {reason} - Current URL: {current_url} **Current Page Context (Interactive Elements with Indices. You can interact with non visible elements as well):** ```html {dom_context_str} ``` {f"**Screenshot Analysis:** Please analyze the attached screenshot to understand the current visual state and identify elements relevant for recovery." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."} **Your Task:** Analyze the current situation, context (DOM, URL, screenshot if provided), and the overall goal. If you are not clear and think scrolling might help, you can scroll to see the complete page. Generate a JSON object matching the required schema. - **If recovery is possible:** Provide a **short sequence (1-3 steps)** of recovery actions in the `recovery_steps` field (list of strings). These steps should aim to get back on track towards the original goal OR correctly perform the intended action of the failed step ('{current_planned_task['description']}') in the *current* context. Focus ONLY on the immediate recovery. Example: `["Click element 'Close Popup Button'", "Verify 'Main Page Title' is visible"]`. `action` field should be null. - **If recovery seems impossible, too complex, or unsafe:** Set the `action` field to `"abort"` and provide a brief explanation in the `reasoning` field. `recovery_steps` should be null. Example: `{{"action": "abort", "reasoning": "Critical error page displayed, cannot identify recovery elements."}}` **JSON Output Structure Examples:** *Recovery Possible:* ```json {{ "recovery_steps": ["Click element 'Accept Cookies Button'", "Verify 'Main Page Title' is visible"], "action": null, "reasoning": null }} ``` *Recovery Impossible:* ```json {{ "recovery_steps": null, "action": "abort", "reasoning": "The application crashed, unable to proceed." }} ``` Respond ONLY with the JSON object matching the schema. """ # --- Call LLM using generate_json, passing image_bytes --- response_obj = None error_msg = None try: logger.debug("[LLM REPLAN] Sending prompt (and potentially image) to generate_json...") response_obj = self.llm_client.generate_json( ReplanSchema, prompt, image_bytes=screenshot_bytes # Pass image here ) logger.debug(f"[LLM REPLAN] Raw response object type: {type(response_obj)}") except Exception as e: logger.error(f"LLM call failed during re-planning: {e}", exc_info=True) print("Error: Could not communicate with LLM for re-planning.") error_msg = f"LLM communication error: {e}" # --- Parse Response --- recovery_steps = None abort_action = False abort_reasoning = "No reason provided." if isinstance(response_obj, ReplanSchema): logger.debug(f"[LLM REPLAN] Successfully parsed response: {response_obj}") if response_obj.recovery_steps and isinstance(response_obj.recovery_steps, list): if all(isinstance(s, str) and s for s in response_obj.recovery_steps): recovery_steps = response_obj.recovery_steps else: logger.warning(f"[LLM REPLAN] Parsed recovery_steps list contains invalid items: {response_obj.recovery_steps}") error_msg = "LLM provided invalid recovery steps." elif response_obj.action == "abort": abort_action = True abort_reasoning = response_obj.reasoning or "No specific reason provided by AI." logger.warning(f"AI recommended aborting recording. Reason: {abort_reasoning}") else: logger.warning("[LLM REPLAN] LLM response did not contain valid recovery steps or an abort action.") error_msg = "LLM response was valid JSON but lacked recovery_steps or abort action." elif isinstance(response_obj, str): # Handle error string from generate_json logger.error(f"[LLM REPLAN] Failed to generate/parse recovery JSON: {response_obj}") error_msg = f"LLM generation/parsing error: {response_obj}" elif response_obj is None and error_msg: # Handle communication error from above pass # error_msg already set else: # Handle unexpected return type logger.error(f"[LLM REPLAN] Unexpected response type from generate_json: {type(response_obj)}") error_msg = f"Unexpected response type: {type(response_obj)}" # --- Handle Outcome (Mode-dependent) --- if abort_action: print(f"\nAI Suggests Aborting: {abort_reasoning}") if self.automated_mode: print("[Auto Mode] Accepting AI abort suggestion.") abort_choice = 'a' else: abort_choice = input("AI suggests aborting. Abort (A) or Ignore and Skip Failed Step (S)? > ").strip().lower() if abort_choice == 'a': self._user_abort_recording = True # Mark for abort self._abort_reason = abort_reasoning self.task_manager.update_subtask_status(current_planned_task['index'], "failed", error=f"Aborted based on AI re-planning suggestion: {abort_reasoning}", force_update=True) return False # Abort else: # Skipped (Interactive only) logger.info("User chose to ignore AI abort suggestion and skip the failed step.") self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped after AI suggested abort", force_update=True) return False # Didn't insert steps elif recovery_steps: print("\nAI Suggested Recovery Steps:") for i, step in enumerate(recovery_steps): print(f" {i+1}. {step}") if self.automated_mode: print("[Auto Mode] Accepting AI recovery steps.") confirm_recovery = 'y' else: confirm_recovery = input("Attempt these recovery steps? (Y/N/Abort) > ").strip().lower() if confirm_recovery == 'y': logger.info(f"Attempting AI recovery steps: {recovery_steps}") if self._insert_recovery_steps(current_planned_task['index'] + 1, recovery_steps): # Record that the original step's intent is being handled by a re-plan self.recorded_steps.append({ "step_id": self._current_step_id, "action": "task_replanned", # Changed action name "description": current_planned_task['description'], # Use original task description "parameters": { "reason_for_replan": reason, "recovery_steps_planned": recovery_steps, "original_task_index_in_plan": current_planned_task['index'] # For traceability }, "selector": None, "wait_after_secs": 0 }) self._current_step_id += 1 # Mark the original task as 'done' in TaskManager as its outcome is now via these recovery steps self.task_manager.update_subtask_status( current_planned_task['index'], "done", result=f"Step handled by re-planning. Details recorded as 'task_replanned'. Recovery steps: {recovery_steps}", force_update=True # Ensure status is updated ) self._consecutive_suggestion_failures = 0 return True # Indicate recovery steps were inserted else: # Insertion failed (should be rare) print("Error: Failed to insert recovery steps. Skipping original failed step.") self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (failed to insert AI recovery steps)", force_update=True) return False elif confirm_recovery == 'a': # Interactive only self._user_abort_recording = True return False # Abort else: # N or invalid (Interactive or failed auto-acceptance) print("Skipping recovery attempt and the original failed step.") logger.info("User declined/skipped AI recovery steps. Skipping original failed step.") self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (User/Auto declined AI recovery)", force_update=True) return False # Skipped else: # LLM failed to provide valid steps or abort print(f"\nAI failed to provide valid recovery steps or an abort action. Reason: {error_msg or 'Unknown LLM issue'}") if self.automated_mode: print("[Auto Mode] Skipping failed step due to LLM re-planning failure.") skip_choice = 's' else: skip_choice = input("Skip the current failed step (S) or Abort recording (A)? > ").strip().lower() if skip_choice == 'a': # Interactive only possibility self._user_abort_recording = True return False # Abort else: # Skip (default for auto mode, or user choice) print("Skipping the original failed step.") logger.warning(f"LLM failed re-planning ({error_msg}). Skipping original failed step.") self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result=f"Skipped (AI re-planning failed: {error_msg})", force_update=True) return False # Skipped def _insert_recovery_steps(self, index: int, recovery_steps: List[str]) -> bool: """Calls TaskManager to insert steps.""" return self.task_manager.insert_subtasks(index, recovery_steps) def _determine_action_and_selector_for_recording(self, current_task: Dict[str, Any], current_url: str, dom_context_str: str # Now contains indexed elements with PRE-GENERATED selectors ) -> Optional[Dict[str, Any]]: # Keep return type as Dict for downstream compatibility """ Uses LLM (generate_json) to propose the browser action (click, type) and identify the target *element index* based on the planned step description and the DOM context. The robust selector is retrieved from the DOM state afterwards. Returns a dictionary representation or None on error. """ logger.info(f"Determining AI suggestion for planned step: '{current_task['description']}'") prompt = f""" You are an AI assistant helping a user record a web test. Your goal is to interpret the user's planned step and identify the **single target interactive element** in the provided context that corresponds to it, then suggest the appropriate action. **Feature Under Test:** {self.feature_description} **Current Planned Step:** {current_task['description']} **Current URL:** {current_url} **Test Recording Progress:** Attempt {current_task['attempts']} of {self.task_manager.max_retries_per_subtask + 1} for this suggestion. **Input Context (Visible Interactive Elements with Indices):** This section shows interactive elements on the page, each marked with `[index]` and its pre-generated robust CSS selector. You can also interact with the non visible elements ```html {dom_context_str} ``` **Your Task:** Based ONLY on the "Current Planned Step" description and the "Input Context": 1. Determine the appropriate **action** (`click`, `type`, `select`, `check`, `uncheck`, `key_press`, `drag_and_drop`, `action_not_applicable`, `suggestion_failed`). 2. If action is `click`, `type`, `select`, `check`, `uncheck`, or `key_press`: * Identify the **single most likely interactive element `[index]`** from the context that matches the description. Set `parameters.index`. 3. If action is `type`: Extract the **text** to be typed. Set `parameters.text`. 4. If action is `select`: Identify the main `<select>` element index and extract the target option's visible label into `parameters.option_label`. 5. If action is `key_press`: Identify the target element `[index]` and extract the key(s) to press. Set `parameters.keys`. 6. If action is `drag_and_drop`: Identify the source element `[index]` and the target element `[target_index]`. 7. Provide brief **reasoning** linking the step description to the chosen index/action/parameters. **Output JSON Structure Examples:** *Click Action:* ```json {{ "action": "click", "parameters": {{"index": 12}}, "reasoning": "The step asks to click the 'Login' button, which corresponds to element [12]." }} ``` *Type Action:* ```json {{ "action": "type", "parameters": {{"index": 5, "text": "user@example.com"}}, "reasoning": "The step asks to type 'user@example.com' into the email field, which is element [5]." }} ``` *Check Action:* ```json {{ "action": "check", "parameters": {{"index": 8}}, "reasoning": "Step asks to check the 'Agree' checkbox [8]." }} ``` *Uncheck Action:* ```json {{ "action": "uncheck", "parameters": {{"index": 9}}, "reasoning": "Step asks to uncheck 'Subscribe' [9]." }} *Key Press:* ```json {{ "action": "key_press", "parameters": {{"index": 3, "keys": "Enter"}}, "reasoning": "The step asks to press Enter on the search input [3]." }} ``` *Drag and Drop:* ```json {{ "action": "drag_and_drop", "parameters": {{"index": 10, "destination_index": 15}}, "reasoning": "The step asks to drag the item [10] to the cart area [15]." }} ``` ```json {{ "action": "select", "parameters": {{"index": 12, "option_label": "Weekly"}}, "reasoning": "The step asks to select 'Weekly' in the 'Notification Frequency' dropdown [12]." }} ``` ``` *Not Applicable (Navigation/Verification):* ```json {{ "action": "action_not_applicable", "parameters": {{}}, "reasoning": "The step 'Navigate to ...' does not involve clicking or typing on an element from the context." }} ``` *Suggestion Failed (Cannot identify element):* ```json {{ "action": "suggestion_failed", "parameters": {{}}, "reasoning": "Could not find a unique element matching 'the second confirmation button'." }} ``` **CRITICAL INSTRUCTIONS:** - Focus on the `[index]` and Do NOT output selectors for `click`/`type` actions. - For `select` action, identify the main `<select>` element index and extract the target option's label into `parameters.option_label`. - For `key_press`, provide the target `index` and the `keys` string. - For `drag_and_drop`, provide the source `index` and the `target_index` - Use `action_not_applicable` for navigation, verification, scroll, wait steps. - Be precise with extracted `text` for the `type` action. Respond ONLY with the JSON object matching the schema. """ # --- End Prompt --- # Add error context if retrying suggestion if current_task['status'] == 'in_progress' and current_task['attempts'] > 1 and current_task.get('error'): error_context = str(current_task['error'])[:300] + "..." prompt += f"\n**Previous Suggestion Attempt Error:**\nAttempt {current_task['attempts'] - 1} failed: {error_context}\nRe-evaluate the description and context carefully.\n" # Add history summary for general context prompt += f"\n**Recent History (Context):**\n{self._get_history_summary()}\n" logger.debug(f"[LLM RECORDER PROMPT] Sending prompt snippet for action/index suggestion:\n{prompt[:500]}...") response_obj = self.llm_client.generate_json(RecorderSuggestionSchema, prompt) suggestion_dict = None # Initialize suggestion_failed = False failure_reason = "LLM suggestion generation failed." if isinstance(response_obj, RecorderSuggestionSchema): logger.debug(f"[LLM RECORDER RESPONSE] Parsed suggestion: {response_obj}") # Convert to dict for downstream use (or refactor downstream to use object) suggestion_dict = response_obj.model_dump(exclude_none=True) action = suggestion_dict.get("action") reasoning = suggestion_dict.get("reasoning", "No reasoning provided.") logger.info(f"[LLM Suggestion] Action: {action}, Params: {suggestion_dict.get('parameters')}, Reasoning: {reasoning[:150]}...") self._add_to_history("LLM Suggestion", suggestion_dict) # --- Basic Validation (Schema handles enum/types) --- required_index_actions = ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"] if action in required_index_actions: target_index = suggestion_dict.get("parameters", {}).get("index") if target_index is None: # Index is required for these actions logger.error(f"LLM suggested action '{action}' but missing required index.") suggestion_failed = True failure_reason = f"LLM suggestion '{action}' missing required parameter 'index'." elif action == "key_press" and suggestion_dict.get("parameters", {}).get("keys") is None: logger.error("LLM suggested action 'key_press' but missing required keys.") suggestion_failed = True failure_reason = "LLM suggestion 'key_press' missing required parameter 'keys'." elif action == "drag_and_drop" and suggestion_dict.get("parameters", {}).get("target_index") is None: logger.error("LLM suggested action 'drag_and_drop' but missing required target_index.") suggestion_failed = True failure_reason = "LLM suggestion 'drag_and_drop' missing required parameter 'target_index'." elif action == "type" and suggestion_dict.get("parameters", {}).get("text") is None: logger.error(f"LLM suggested action 'type' but missing required text.") suggestion_failed = True failure_reason = f"LLM suggestion 'type' missing required parameter 'text'." elif action == "suggestion_failed": suggestion_failed = True failure_reason = suggestion_dict.get("reasoning", "LLM indicated suggestion failed.") elif action == "action_not_applicable": pass # This is a valid outcome, handled below else: # Should not happen if schema enum is enforced logger.error(f"LLM returned unexpected action type: {action}") suggestion_failed = True failure_reason = f"LLM returned unknown action '{action}'." elif isinstance(response_obj, str): # Handle error string logger.error(f"[LLM Suggestion Failed] LLM returned an error string: {response_obj}") self._add_to_history("LLM Suggestion Failed", {"raw_error_response": response_obj}) suggestion_failed = True failure_reason = f"LLM error: {response_obj}" else: # Handle unexpected type logger.error(f"[LLM Suggestion Failed] Unexpected response type from generate_json: {type(response_obj)}") self._add_to_history("LLM Suggestion Failed", {"response_type": str(type(response_obj))}) suggestion_failed = True failure_reason = f"Unexpected response type: {type(response_obj)}" # --- Process Suggestion --- if suggestion_failed: # Return a standardized failure dictionary return {"action": "suggestion_failed", "parameters": {}, "reasoning": failure_reason} # Handle successful suggestions (click, type, not_applicable) if suggestion_dict["action"] in required_index_actions: target_index = suggestion_dict["parameters"]["index"] # We validated index exists above # --- Retrieve the node and pre-generated selector --- if self._latest_dom_state is None or not self._latest_dom_state.selector_map: logger.error("DOM state or selector map is missing, cannot lookup suggested index.") return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal error: DOM state unavailable."} target_node = self._latest_dom_state.selector_map.get(target_index) if target_node is None: available_indices = list(self._latest_dom_state.selector_map.keys()) logger.error(f"LLM suggested index [{target_index}], but it was not found in DOM context map. Available: {available_indices}") return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{target_index}] not found in current page context."} suggested_selector = target_node.css_selector if not suggested_selector: # Try to generate it now if missing suggested_selector = self.browser_controller.get_selector_for_node(target_node) if suggested_selector: target_node.css_selector = suggested_selector # Cache it else: logger.error(f"Could not generate selector for suggested index [{target_index}] (Node: {target_node.tag_name}).") return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{target_index}]."} if suggested_selector and target_node and self.browser_controller.page: try: handles = self.browser_controller.page.query_selector_all(suggested_selector) num_matches = len(handles) validation_passed = False if num_matches == 1: # Get XPath of the element found by the generated selector # Note: Requires a reliable JS function 'generateXPath' accessible in evaluate try: with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path: js_code = js_path.read_text(encoding='utf-8') logger.debug("xpathgenerator.js loaded successfully.") except FileNotFoundError: logger.error("xpathgenerator.js not found in the 'agents' package directory!") raise except Exception as e: logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True) raise # Pass a *function string* to evaluate. Playwright passes the handle as 'element'. # The function string first defines our helper, then calls it. script_to_run = f""" (element) => {{ {js_code} // Define the helper function(s) return generateXPathForElement(element); // Call it }} """ # Use page.evaluate, passing the script string and the element handle matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0]) # Compare XPaths if target_node.xpath == matched_xpath: validation_passed = True logger.info(f"Validation PASSED: Suggested selector '{suggested_selector}' uniquely matches target node [{target_index}].") else: logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').") elif num_matches == 0: logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 0 elements.") else: # num_matches > 1 logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched {num_matches} elements (not unique).") # --- Fallback to XPath if validation failed --- if not validation_passed: logger.warning(f"Falling back to XPath selector for target node [{target_index}].") original_selector = suggested_selector suggested_selector = f"xpath={target_node.xpath}" # Update the suggestion dictionary suggestion_dict["suggested_selector"] = suggested_selector suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]" except Exception as validation_err: logger.error(f"Error during selector validation ('{suggested_selector}'): {validation_err}. Falling back to XPath.") original_selector = suggested_selector suggested_selector = f"xpath={target_node.xpath}" # Update the suggestion dictionary suggestion_dict["suggested_selector"] = suggested_selector suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]" logger.info(f"LLM suggested index [{target_index}], resolved to selector: '{suggested_selector}'") # Add resolved selector and node to the dictionary returned suggestion_dict["suggested_selector"] = suggested_selector suggestion_dict["target_node"] = target_node if suggestion_dict["action"] == "drag_and_drop": destination_index = suggestion_dict["parameters"].get("destination_index") if destination_index is not None: destination_node = self._latest_dom_state.selector_map.get(destination_index) if destination_node is None: available_indices = list(self._latest_dom_state.selector_map.keys()) logger.error(f"LLM suggested index [{destination_index}], but it was not found in DOM context map. Available: {available_indices}") return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{destination_index}] not found in current page context."} destination_selector = destination_node.css_selector if not destination_selector: # Try to generate it now if missing destination_selector = self.browser_controller.get_selector_for_node(destination_node) if destination_selector: destination_node.css_selector = destination_selector # Cache it else: logger.error(f"Could not generate selector for suggested index [{destination_index}] (Node: {destination_node.tag_name}).") return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{destination_index}]."} suggestion_dict["destination_selector"] = destination_selector suggestion_dict["destination_node"] = destination_node logger.info(f"LLM suggested drag target index [{destination_index}], resolved to selector: '{destination_selector}'") else: # Should have been caught by validation, but double-check logger.error("LLM suggested drag_and_drop without destination_index.") return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Drag and drop suggestion missing destination index."} return suggestion_dict elif suggestion_dict["action"] == "action_not_applicable": # Pass this through directly return suggestion_dict else: # Should be unreachable given the checks above logger.error("Reached unexpected point in suggestion processing.") return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal processing error after LLM response."} def _execute_action_for_recording(self, action: str, selector: Optional[str], parameters: Dict[str, Any]) -> Dict[str, Any]: """ Executes a specific browser action (navigate, click, type) during recording. This is called *after* user confirmation/override. It does not involve AI decision. """ result = {"success": False, "message": f"Action '{action}' invalid.", "data": None} if not action: result["message"] = "No action specified for execution." logger.warning(f"[RECORDER_EXEC] {result['message']}") return result logger.info(f"[RECORDER_EXEC] Executing: {action} | Selector: {selector} | Params: {parameters}") self._add_to_history("Executing Recorder Action", {"action": action, "selector": selector, "parameters": parameters}) try: if action == "navigate": url = parameters.get("url") if not url or not isinstance(url, str): raise ValueError("Missing or invalid 'url'.") self.browser_controller.goto(url) result["success"] = True result["message"] = f"Navigated to {url}." # Add implicit wait for load state after navigation self.recorded_steps.append({ "step_id": self._current_step_id, # Use internal counter "action": "wait_for_load_state", "description": "Wait for page navigation to complete", "parameters": {"state": "domcontentloaded"}, # Reasonable default "selector": None, "wait_after_secs": 0 }) self._current_step_id += 1 # Increment after adding implicit step elif action == "click": if not selector: raise ValueError("Missing selector for click action.") self.browser_controller.click(selector) time.sleep(0.5) result["success"] = True result["message"] = f"Clicked element: {selector}." elif action == "type": text = parameters.get("text") if not selector: raise ValueError("Missing selector for type action.") if text is None: raise ValueError("Missing or invalid 'text'.") # Allow empty string? yes. self.browser_controller.type(selector, text) result["success"] = True result["message"] = f"Typed into element: {selector}." elif action == "scroll": # Basic scroll support if planned direction = parameters.get("direction") if direction not in ["up", "down"]: raise ValueError("Invalid scroll direction.") self.browser_controller.scroll(direction) result["success"] = True result["message"] = f"Scrolled {direction}." elif action == "check": if not selector: raise ValueError("Missing selector for check action.") self.browser_controller.check(selector) result["success"] = True result["message"] = f"Checked element: {selector}." elif action == "uncheck": if not selector: raise ValueError("Missing selector for uncheck action.") self.browser_controller.uncheck(selector) result["success"] = True result["message"] = f"Unchecked element: {selector}." elif action == "select": option_label = parameters.get("option_label") # option_value = parameters.get("option_value") # If supporting value selection if not selector: raise ValueError("Missing selector for select action.") if not option_label: # and not option_value: # Prioritize label raise ValueError("Missing 'option_label' parameter for select action.") logger.info(f"Selecting option by label '{option_label}' in element: {selector}") # Use the main browser_controller page reference locator = self.browser_controller._get_locator(selector) # Use helper to get locator # select_option can take label, value, or index locator.select_option(label=option_label, timeout=self.browser_controller.default_action_timeout) result["success"] = True result["message"] = f"Selected option '{option_label}' in element: {selector}." elif action == "key_press": keys = parameters.get("keys") if not selector: raise ValueError("Missing selector for key_press action.") if not keys: raise ValueError("Missing 'keys' parameter for key_press action.") self.browser_controller.press(selector, keys) result["success"] = True result["message"] = f"Pressed '{keys}' on element: {selector}." elif action == "drag_and_drop": destination_selector = parameters.get("destination_selector") if not selector: raise ValueError("Missing source selector for drag_and_drop.") if not destination_selector: raise ValueError("Missing 'destination_selector' parameter for drag_and_drop.") self.browser_controller.drag_and_drop(selector, destination_selector) result["success"] = True result["message"] = f"Dragged '{selector}' to '{destination_selector}'." # NOTE: "wait" actions are generally not *executed* during recording, # they are just recorded based on the plan. else: result["message"] = f"Action '{action}' is not directly executable during recording via this method." logger.warning(f"[RECORDER_EXEC] {result['message']}") except (PlaywrightError, PlaywrightTimeoutError, ValueError) as e: error_msg = f"Execution during recording failed for action '{action}' on selector '{selector}': {type(e).__name__}: {e}" logger.error(f"[RECORDER_EXEC] {error_msg}", exc_info=False) result["message"] = error_msg result["success"] = False # Optionally save screenshot on execution failure *during recording* try: ts = time.strftime("%Y%m%d_%H%M%S") fname = f"output/recorder_exec_fail_{action}_{ts}.png" self.browser_controller.save_screenshot(fname) logger.info(f"Saved screenshot on recorder execution failure: {fname}") except: pass # Ignore screenshot errors here except Exception as e: error_msg = f"Unexpected Error during recorder execution action '{action}': {type(e).__name__}: {e}" logger.critical(f"[RECORDER_EXEC] {error_msg}", exc_info=True) result["message"] = error_msg result["success"] = False # Log Action Result log_level = logging.INFO if result["success"] else logging.WARNING logger.log(log_level, f"[RECORDER_EXEC_RESULT] Action '{action}' | Success: {result['success']} | Message: {result['message']}") self._add_to_history("Recorder Action Result", {"success": result["success"], "message": result["message"]}) return result # --- New Recorder Core Logic --- def _handle_interactive_step_recording(self, planned_step: Dict[str, Any], suggestion: Dict[str, Any]) -> bool: """ Handles the user interaction loop for a suggested 'click' or 'type' action. Returns True if the step was successfully recorded (or skipped), False if aborted. """ action = suggestion["action"] suggested_selector = suggestion["suggested_selector"] # source selector target_node = suggestion["target_node"] # DOMElementNode # source node destination_selector = suggestion.get("destination_selector") destination_node = suggestion.get("destination_node") parameters = suggestion["parameters"] # Contains index and potentially text reasoning = suggestion.get("reasoning", "N/A") planned_desc = planned_step["description"] final_selector = None performed_action = False user_choice = None parameter_name = None action_recorded = False # Flag to track if we actually recorded the step # --- Automated Mode --- if self.automated_mode: logger.info(f"[Auto Mode] Handling AI suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})") logger.info(f"[Auto Mode] Suggested Selector: {suggested_selector}") if action == "drag_and_drop": logger.info(f"[Auto Mode] Suggested Target Selector: {destination_selector}") self.browser_controller.clear_highlights() self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion") if action == "drag_and_drop" and destination_selector and destination_node: self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Target)") # Directly accept AI suggestion final_selector = suggested_selector final_destination_selector = destination_selector logger.info(f"[Auto Mode] Automatically accepting AI suggestion.") exec_params = parameters.copy() # Start with base params if action == "drag_and_drop": exec_params["destination_selector"] = final_destination_selector # Add target for execution # Execute action on AI's suggested selector exec_result = self._execute_action_for_recording(action, final_selector, exec_params) performed_action = exec_result["success"] if performed_action: # Record the successful step automatically record = { "step_id": self._current_step_id, "action": action, "description": planned_desc, "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION } if action == "type": # Include text, but no parameterization prompt record["parameters"]["text"] = parameters.get("text", "") elif action == "select": # Ensure 'option_label' from the suggestion's parameters is added if "option_label" in parameters: record["parameters"]["option_label"] = parameters["option_label"] if "option_value" in parameters: record["parameters"]["option_value"] = parameters["option_value"] if "option_index" in parameters: record["parameters"]["option_index"] = parameters["option_index"] elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "") elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded (AI Suggestion - Automated): {action} on {final_selector}") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion (automated) as step {record['step_id']}") return True # Success else: # AI suggestion execution failed logger.error(f"[Auto Mode] Execution FAILED using AI suggested selector: {exec_result['message']}") # Mark as failed for potential re-planning or retry in the main loop self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Automated execution failed: {exec_result['message']}") self.browser_controller.clear_highlights() self.panel.hide_recorder_panel() # Do not abort automatically, let the main loop handle failure/retry logic return True # Indicate handled (failure noted), loop continues # --- Interactive Mode --- else: print("\n" + "="*60) print(f"Planned Step: {planned_desc}") print(f"AI Suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})") print(f"Suggested Selector: {suggested_selector}") if action == "drag_and_drop": print(f"Suggested Selector (Destination): {destination_selector}") print("="*60) # Highlight suggested element self.browser_controller.clear_highlights() self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion") # Orange for suggestion if action == "drag_and_drop" and destination_selector and destination_node: self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Destination)") # Show the UI Panel with options suggestion_display_text = f"'{action}' on <{target_node.tag_name}>" if action == "type": suggestion_display_text += f" with text '{parameters.get('text', '')[:20]}...'" elif action == "key_press": suggestion_display_text += f" with key(s) '{parameters.get('keys', '')}'" elif action == "drag_and_drop": suggestion_display_text += f" to <{destination_node.tag_name if destination_node else 'N/A'}>" self.panel.show_recorder_panel(planned_desc, suggestion_display_text) # Setup listener *after* showing panel, *before* waiting listener_setup = self.browser_controller.setup_click_listener() if not listener_setup: logger.error("Failed to set up click listener, cannot proceed with override.") self.panel.hide_recorder_panel() return False # Abort # --- Wait for User Interaction (Click Override OR Panel Button) --- # Total time budget for interaction TOTAL_INTERACTION_TIMEOUT = 20.0 # e.g., 20 seconds total PANEL_WAIT_TIMEOUT = 15.0 # Time to wait for panel *after* click timeout override_selector = None try: logger.debug("Waiting for user click override...") # Wait for click first (short timeout) click_wait_time = TOTAL_INTERACTION_TIMEOUT - PANEL_WAIT_TIMEOUT override_selector = self.browser_controller.wait_for_user_click_or_timeout(click_wait_time) if override_selector: print(f"\n[Recorder] User override detected! Using selector: {override_selector}") user_choice = 'override' # Special internal choice else: # Click timed out, now wait for panel interaction logger.debug("No click override. Waiting for panel interaction...") user_choice = self.panel.wait_for_panel_interaction(PANEL_WAIT_TIMEOUT) if user_choice: print(f"\n[Recorder] User choice via panel: {user_choice}") else: print("\n[Recorder] Timeout waiting for panel interaction. Skipping step.") user_choice = 'skip' # Default to skip on timeout except Exception as e: logger.error(f"Error during user interaction wait: {e}", exc_info=True) user_choice = 'abort' # Abort on unexpected error # --- Process User Choice --- if user_choice == 'override': final_selector = override_selector final_destination_selector = destination_selector if action == "drag_and_drop" else None performed_action = False print(f"Executing original action '{action}' on overridden selector...") exec_params = parameters.copy() if action == "drag_and_drop": exec_params["destination_selector"] = final_destination_selector exec_result = self._execute_action_for_recording(action, final_selector, parameters) performed_action = exec_result["success"] if performed_action: # --- Ask for Parameterization (for 'type' action) --- if action == "type": param_text = parameters.get("text", "") if self.panel.prompt_parameterization_in_panel(param_text): print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...") param_choice = self.panel.wait_for_panel_interaction(15.0) # Wait for param submit if param_choice == 'parameterized': parameter_name = self.panel.get_parameterization_result() print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.") else: print("Parameterization skipped or timed out.") else: print("Could not show parameterization UI.") # --- Record the override step --- record = { "step_id": self._current_step_id, "action": action, "description": planned_desc, "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION } if action == "type": record["parameters"]["text"] = parameters.get("text", "") elif action == "select": # Assume original parameters (like option_label) still apply for override if "option_label" in parameters: record["parameters"]["option_label"] = parameters["option_label"] elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "") elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector if parameter_name: record["parameters"]["parameter_name"] = parameter_name self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded (User Override): {action} on {final_selector}") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded override as step {record['step_id']}") action_recorded = True else: # Override execution failed print(f"WARNING: Execution failed using override selector: {exec_result['message']}") # Ask to skip or abort via panel again? Simpler to just skip here. print("Skipping step after failed override execution.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped after failed override execution") elif user_choice == 'accept': print("Accepting AI suggestion.") final_selector = suggested_selector final_destination_selector = destination_selector if action == "drag_and_drop" else None performed_action = False exec_params = parameters.copy() if action == "drag_and_drop": exec_params["destination_selector"] = final_destination_selector exec_result = self._execute_action_for_recording(action, final_selector, parameters) performed_action = exec_result["success"] if performed_action: # --- Ask for Parameterization --- if action == "type": param_text = parameters.get("text", "") if self.panel.prompt_parameterization_in_panel(param_text): print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...") param_choice = self.panel.wait_for_panel_interaction(15.0) if param_choice == 'parameterized': parameter_name = self.panel.get_parameterization_result() print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.") else: print("Parameterization skipped or timed out.") else: print("Could not show parameterization UI.") # --- Record the accepted AI suggestion --- record = { "step_id": self._current_step_id, "action": action, "description": planned_desc, "parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION } if action == "type": record["parameters"]["text"] = parameters.get("text", "") elif action == "select": if "option_label" in parameters: record["parameters"]["option_label"] = parameters["option_label"] elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "") elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector if parameter_name: record["parameters"]["parameter_name"] = parameter_name self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded (AI Suggestion): {action} on {final_selector}") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion as step {record['step_id']}") action_recorded = True else: # AI suggestion execution failed print(f"WARNING: Execution failed using AI suggested selector: {exec_result['message']}") # Ask to retry/skip/abort via panel again? Or mark as failed for main loop retry? # Let's mark as failed for retry by the main loop. print("Marking step for retry after execution failure...") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Execution failed: {exec_result['message']}") # Action was NOT recorded in this case elif user_choice == 'skip': print("Skipping planned step.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped via panel") elif user_choice == 'abort': print("Aborting recording process.") self._user_abort_recording = True # No need to update task manager status if aborting globally else: # Should not happen with panel, but handle defensively (e.g., timeout resulted in None) print("Invalid choice or timeout. Skipping step.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid user choice or timeout") # --- Cleanup UI after interaction --- self.browser_controller.clear_highlights() self.panel.hide_recorder_panel() if listener_setup: self.browser_controller.remove_click_listener() # Ensure listener removed return not self._user_abort_recording # Return True if handled (recorded/skipped/failed for retry), False only on ABORT def _get_llm_assertion_target_index(self, planned_desc: str, dom_context_str: str) -> Tuple[Optional[int], Optional[str]]: """Helper function to ask LLM for the target index for an assertion.""" def _handle_assertion_recording(self, planned_step: Dict[str, Any]) -> bool: """ Handles prompting the user for assertion details based on a 'Verify...' planned step. Returns True if recorded/skipped, False if aborted. """ if self.automated_mode: logger.error("[Auto Mode] Reached manual assertion handler. This indicates verification fallback failed or wasn't triggered. Skipping step.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped (Manual assertion handler reached in auto mode)") return True # Skip planned_desc = planned_step["description"] logger.info(f"Starting interactive assertion definition for: '{planned_desc}'") print("\n" + "="*60) print(f"Planned Step: {planned_desc}") print("Define Assertion via UI Panel...") print("="*60) current_state = "target_selection" # States: target_selection, type_selection, param_input suggested_selector = None final_selector = None assertion_action = None assertion_params = {} llm_target_suggestion_failed = False # Track if initial suggestion failed try: # 1. Identify Target Element (Use LLM again, simplified prompt) # We need a selector for the element to assert against. current_url = self.browser_controller.get_current_url() dom_context_str = "Error getting DOM" if self._latest_dom_state: dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification') prompt = f""" Given the verification step: "{planned_desc}" And the current interactive elements context (with indices). You can interact with non visible elements in the tree too: ```html {dom_context_str} ``` Identify the element index `[index]` most relevant to this verification task. Respond ONLY with a JSON object matching the schema: {{ "index": INDEX_NUMBER_OR_NULL, "reasoning": "OPTIONAL_REASONING_IF_NULL" }} Example Output (Found): {{"index": 5}} Example Output (Not Found): {{"index": null, "reasoning": "Cannot determine a single target element for 'verify presence of error'."}} """ logger.debug(f"[LLM ASSERT PROMPT] Sending prompt for assertion target index:\n{prompt[:500]}...") response_obj = self.llm_client.generate_json(AssertionTargetIndexSchema, prompt) target_index = None llm_reasoning = "LLM did not provide a target index or reasoning." # Default if isinstance(response_obj, AssertionTargetIndexSchema): logger.debug(f"[LLM ASSERT RESPONSE] Parsed index response: {response_obj}") target_index = response_obj.index # Will be None if null in JSON if target_index is None and response_obj.reasoning: llm_reasoning = response_obj.reasoning elif target_index is None: llm_reasoning = "LLM did not identify a target element (index is null)." elif isinstance(response_obj, str): # Handle error string logger.error(f"[LLM ASSERT RESPONSE] Failed to get target index JSON: {response_obj}") llm_reasoning = f"LLM error getting target index: {response_obj}" else: # Handle unexpected type logger.error(f"[LLM ASSERT RESPONSE] Unexpected response type for target index: {type(response_obj)}") llm_reasoning = f"Unexpected LLM response type: {type(response_obj)}" target_node = None target_selector = None if target_index is not None: if self._latest_dom_state and self._latest_dom_state.selector_map: target_node = self._latest_dom_state.selector_map.get(target_index) if target_node and target_node.css_selector: suggested_selector = target_node.css_selector print(f"AI suggests target [Index: {target_index}]: <{target_node.tag_name}>") print(f" Selector: `{suggested_selector}`") self.browser_controller.clear_highlights() self.browser_controller.highlight_element(suggested_selector, target_index, color="#0000FF", text="Assert Target?") else: print(f"AI suggested index [{target_index}], but element/selector not found.") llm_target_suggestion_failed = True else: print(f"AI suggested index [{target_index}], but DOM map unavailable.") llm_target_suggestion_failed = True else: print(f"AI could not suggest a target element. Reason: {llm_reasoning}") llm_target_suggestion_failed = True # Mark as failed if no index except Exception as e: logger.error(f"Error getting initial assertion target suggestion: {e}", exc_info=True) print(f"Error getting AI suggestion: {e}") llm_target_suggestion_failed = True # --- User confirms/overrides target selector --- while True: user_choice = None override_selector = None # --- State 1: Target Selection --- if current_state == "target_selection": print("Panel State: Confirm or Override Target Selector.") self.panel.show_assertion_target_panel(planned_desc, suggested_selector) listener_setup = self.browser_controller.setup_click_listener() if not listener_setup: logger.error("Failed to set up click listener for override.") user_choice = 'abort' # Force abort if listener fails else: # Wait for click override OR panel interaction try: logger.debug("Waiting for user click override (Assertion Target)...") override_selector = self.browser_controller.wait_for_user_click_or_timeout(5.0) # 5s for click override if override_selector: print(f"\n[Recorder] User override target detected! Using selector: {override_selector}") user_choice = 'override_target_confirmed' # Internal choice after click else: logger.debug("No click override. Waiting for panel interaction (Assertion Target)...") user_choice = self.panel.wait_for_panel_interaction(15.0) # Wait longer for panel if not user_choice: user_choice = 'skip' # Default to skip on panel timeout except Exception as e: logger.error(f"Error during assertion target interaction wait: {e}", exc_info=True) user_choice = 'abort' # --- Process Target Choice --- self.browser_controller.remove_click_listener() # Remove listener after this stage if user_choice == 'confirm_target': if suggested_selector: final_selector = suggested_selector print(f"Using suggested target: {final_selector}") current_state = "type_selection" # Move to next state continue # Restart loop in new state else: print("Error: Cannot confirm target, no suggestion was available.") # Stay in this state or treat as skip? Let's allow retry. current_state = "target_selection" continue elif user_choice == 'override_target_confirmed': # Came from click override if override_selector: final_selector = override_selector print(f"Using override target: {final_selector}") # Try highlighting the user's choice try: self.browser_controller.clear_highlights() self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target") except Exception as e: print(f"Warning: Could not highlight user selector '{final_selector}': {e}") current_state = "type_selection" # Move to next state continue # Restart loop in new state else: print("Error: Override click detected but no selector captured.") current_state = "target_selection" # Stay here continue elif user_choice == 'override_target': # Clicked button in panel to enable clicking print("Click the element on the page you want to assert against...") self.panel.hide_recorder_panel() # Hide panel while clicking listener_setup = self.browser_controller.setup_click_listener() if not listener_setup: logger.error("Failed to set up click listener for override.") user_choice = 'abort' else: try: override_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0) # Longer wait for manual click if override_selector: print(f"\n[Recorder] User override target selected: {override_selector}") user_choice = 'override_target_confirmed' # Set internal choice else: print("Timeout waiting for override click. Please try again.") user_choice = None # Force loop restart in target_selection except Exception as e: logger.error(f"Error during manual override click wait: {e}", exc_info=True) user_choice = 'abort' self.browser_controller.remove_click_listener() # Remove listener if user_choice == 'override_target_confirmed': final_selector = override_selector try: self.browser_controller.clear_highlights() self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target") except Exception as e: print(f"Warning: Could not highlight user selector: {e}") current_state = "type_selection" continue elif user_choice == 'abort': self._user_abort_recording = True; break # Exit loop else: # Timeout or error during manual click current_state = "target_selection" # Go back to target panel continue elif user_choice == 'skip': print("Skipping assertion definition.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion") break # Exit loop, return True elif user_choice == 'abort': self._user_abort_recording = True; break # Exit loop, return False else: # Includes timeout, None, unexpected values print("Invalid choice or timeout. Please select target action.") current_state = "target_selection" # Stay here continue # --- State 2: Type Selection --- elif current_state == "type_selection": if not final_selector: # Should not happen if logic is correct logger.error("Assertion state error: Reached type selection without a final selector.") current_state = "target_selection"; continue # Go back print("Panel State: Select Assertion Type.") self.panel.show_assertion_type_panel(final_selector) user_choice = self.panel.wait_for_panel_interaction(20.0) # Wait for type selection if not user_choice: user_choice = 'skip' # Default to skip on timeout # --- Process Type Choice --- if user_choice.startswith('select_type_'): type_suffix = user_choice.split('select_type_')[-1] # Map suffix to actual action string action_map = { 'text_contains': "assert_text_contains", 'text_equals': "assert_text_equals", 'visible': "assert_visible", 'hidden': "assert_hidden", 'attribute_equals': "assert_attribute_equals", 'element_count': "assert_element_count", 'checked': "assert_checked", 'not_checked': "assert_not_checked", "disabled": "assert_disabled", "enabled": "assert_enabled", "vision_llm": "assert_llm_verification" } assertion_action = action_map.get(type_suffix) if not assertion_action: print(f"Error: Unknown assertion type selected '{type_suffix}'.") current_state = "type_selection"; continue # Ask again print(f"Assertion type selected: {assertion_action}") # Check if parameters are needed needs_params_map = { "assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"], "assert_attribute_equals": ["Attribute Name", "Expected Value"], "assert_element_count": ["Expected Count"] } if assertion_action in needs_params_map: current_state = "param_input" # Move to param state continue # Restart loop in new state else: assertion_params = {} # No params needed # Proceed directly to recording break # Exit loop to record elif user_choice == 'back_to_target': current_state = "target_selection"; continue # Go back elif user_choice == 'skip': print("Skipping assertion definition.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion") break # Exit loop, return True elif user_choice == 'abort': self._user_abort_recording = True; break # Exit loop, return False else: # Includes timeout, None, unexpected values print("Invalid choice or timeout. Please select assertion type.") current_state = "type_selection"; continue # Stay here # --- State 3: Parameter Input --- elif current_state == "param_input": if not final_selector or not assertion_action: # Should not happen logger.error("Assertion state error: Reached param input without selector or action.") current_state = "target_selection"; continue # Go way back print("Panel State: Enter Assertion Parameters.") needs_params_map = { # Redefine here for clarity "assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"], "assert_attribute_equals": ["Attribute Name", "Expected Value"], "assert_element_count": ["Expected Count"] } param_labels = needs_params_map.get(assertion_action, []) self.panel.show_assertion_params_panel(final_selector, assertion_action, param_labels) user_choice = self.panel.wait_for_panel_interaction(60.0) # Longer timeout for typing if not user_choice: user_choice = 'skip' # Default to skip on timeout # --- Process Param Choice --- if user_choice == 'submit_params': raw_params = self.panel.get_assertion_parameters_from_panel(len(param_labels)) if raw_params is None: print("Error retrieving parameters from panel. Please try again.") current_state = "param_input"; continue # Stay here # Map raw_params (param1, param2) to specific keys assertion_params = {} try: if assertion_action == "assert_text_contains" or assertion_action == "assert_text_equals": assertion_params["expected_text"] = raw_params.get("param1", "") elif assertion_action == "assert_attribute_equals": assertion_params["attribute_name"] = raw_params.get("param1", "") assertion_params["expected_value"] = raw_params.get("param2", "") if not assertion_params["attribute_name"]: raise ValueError("Attribute name cannot be empty.") elif assertion_action == "assert_element_count": count_str = raw_params.get("param1", "") if not count_str.isdigit(): raise ValueError("Expected count must be a number.") assertion_params["expected_count"] = int(count_str) print(f"Parameters submitted: {assertion_params}") break # Exit loop to record except ValueError as ve: print(f"Input Error: {ve}. Please correct parameters.") current_state = "param_input"; continue # Stay here to retry elif user_choice == 'back_to_type': current_state = "type_selection"; continue # Go back elif user_choice == 'abort': self._user_abort_recording = True; break # Exit loop, return False else: # Includes skip, timeout, None, unexpected values print("Skipping assertion definition.") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion parameters") break # Exit loop, return True else: # Should not happen logger.error(f"Assertion state error: Unknown state '{current_state}'. Aborting assertion.") self._user_abort_recording = True; break # Exit loop, return False # --- End of State Machine Loop --- self.panel.hide_recorder_panel() # Ensure panel is hidden # --- Record Step if not Aborted/Skipped --- if not self._user_abort_recording and assertion_action and final_selector: # Check if loop exited normally for recording vs. skipping task_status = self.task_manager.subtasks[self.task_manager.current_subtask_index]['status'] if task_status != "skipped": # Only record if not explicitly skipped record = { "step_id": self._current_step_id, "action": assertion_action, "description": planned_desc, # Use original planned description "parameters": assertion_params, "selector": final_selector, "wait_after_secs": 0 # Assertions usually don't need waits after } self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded: {assertion_action} on {final_selector}") self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded as assertion step {record['step_id']}") return True else: logger.info("Assertion definition skipped by user.") return True # Skipped successfully elif self._user_abort_recording: logger.warning("Assertion definition aborted by user.") return False # Aborted else: # Loop exited without recording (likely due to skip choice) logger.info("Assertion definition finished without recording (likely skipped).") # Task status should already be 'skipped' from within the loop return True def record(self, feature_description: str) -> Dict[str, Any]: """ Runs the interactive test recording process with LLM verification and dynamic re-planning. """ if not self.is_recorder_mode: logger.error("Cannot run record() method when not in recorder mode.") return {"success": False, "message": "Agent not initialized in recorder mode."} automation_status = "Automated" if self.automated_mode else "Interactive" logger.info(f"--- Starting Test Recording ({automation_status}) --- Feature: {feature_description}") if not self.automated_mode: print(f"\n--- Starting Recording for Feature ({automation_status}) ---\n{feature_description}\n" + "-"*35) start_time = time.time() # Initialize recording status recording_status = { "success": False, "feature": feature_description, "message": "Recording initiated.", "output_file": None, "steps_recorded": 0, "duration_seconds": 0.0, } # Reset state for a new recording session self.history = [] self.recorded_steps = [] self._current_step_id = 1 self.output_file_path = None self._latest_dom_state = None self._user_abort_recording = False self._consecutive_suggestion_failures = 0 self._last_failed_step_index = -1 try: logger.debug("[RECORDER] Starting browser controller...") self.browser_controller.start() self.browser_controller.clear_console_messages() self.task_manager.set_main_task(feature_description) logger.debug("[RECORDER] Planning initial steps...") self._plan_subtasks(feature_description) # Generates the list of planned steps if not self.task_manager.subtasks: recording_status["message"] = "❌ Recording Planning Failed: No steps generated." raise ValueError(recording_status["message"]) # Use ValueError for planning failure logger.info(f"Beginning interactive recording for {len(self.task_manager.subtasks)} initial planned steps...") iteration_count = 0 # General loop counter for safety MAX_RECORDING_ITERATIONS = self.max_iterations * 2 # Allow more iterations for potential recovery steps while iteration_count < MAX_RECORDING_ITERATIONS: iteration_count += 1 planned_steps_count = len(self.task_manager.subtasks) # Get current count current_planned_task = self.task_manager.get_next_subtask() if self._user_abort_recording: # Abort check recording_status["message"] = f"Recording aborted by {'user' if not self.automated_mode else 'AI'} because {self._abort_reason if self.automated_mode else 'User had some chores to do'}." logger.warning(recording_status["message"]) break if not current_planned_task: # Check if finished normally or failed planning/retries if self.task_manager.is_complete(): # Check if ANY task failed permanently perm_failed_tasks = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask] logger.error(perm_failed_tasks) if perm_failed_tasks: first_failed_idx = self.task_manager.subtasks.index(perm_failed_tasks[0]) failed_task = perm_failed_tasks[0] recording_status["message"] = f"Recording process completed with failures. First failed step #{first_failed_idx+1}: {failed_task['description']} (Error: {failed_task['result']})" recording_status["success"] = False # Mark as failed overall logger.error(recording_status["message"]) elif all(t['status'] in ['done', 'skipped'] for t in self.task_manager.subtasks): logger.info("All planned steps processed or skipped successfully.") recording_status["message"] = "Recording process completed." recording_status["success"] = True # Mark as success ONLY if no permanent failures else: # Should not happen if is_complete is true and perm_failed is empty recording_status["message"] = "Recording finished, but final state inconsistent." recording_status["success"] = False logger.warning(recording_status["message"]) else: recording_status["message"] = "Recording loop ended unexpectedly (no actionable tasks found)." recording_status["success"] = False logger.error(recording_status["message"]) break # Exit loop # Add index to the task dictionary for easier reference current_task_index = self.task_manager.current_subtask_index current_planned_task['index'] = current_task_index logger.info(f"\n===== Processing Planned Step {current_task_index + 1}/{planned_steps_count} (Attempt {current_planned_task['attempts']}) =====") if not self.automated_mode: print(f"\nProcessing Step {current_task_index + 1}: {current_planned_task['description']}") # --- Reset Consecutive Failure Counter if step index changes --- if self._last_failed_step_index != current_task_index: self._consecutive_suggestion_failures = 0 self._last_failed_step_index = current_task_index # Update last processed index # --- State Gathering --- logger.info("Gathering browser state and structured DOM...") current_url = "Error: Could not get URL" dom_context_str = "Error: Could not process DOM" static_id_map = {} screenshot_bytes = None # Initialize screenshot bytes self._latest_dom_state = None self.browser_controller.clear_highlights() # Clear previous highlights self.panel.hide_recorder_panel() try: current_url = self.browser_controller.get_current_url() # Always try to get DOM state self._latest_dom_state = self.browser_controller.get_structured_dom(highlight_all_clickable_elements=False, viewport_expansion=-1) if self._latest_dom_state and self._latest_dom_state.element_tree: dom_context_str, static_id_map = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification') self._last_static_id_map = static_id_map else: dom_context_str = "Error processing DOM structure." self._last_static_id_map = {} logger.error("[RECORDER] Failed to get valid DOM state.") # Get screenshot, especially useful for verification/re-planning screenshot_bytes = self.browser_controller.take_screenshot() except Exception as e: logger.error(f"Failed to gather browser state/DOM/Screenshot: {e}", exc_info=True) dom_context_str = f"Error gathering state: {e}" # Allow proceeding, LLM might handle navigation or re-planning might trigger # --- Handle Step Type --- planned_step_desc_lower = current_planned_task['description'].lower() step_handled_internally = False # Flag to indicate if step logic was fully handled here # --- 1. Verification Step --- if planned_step_desc_lower.startswith(("verify", "assert")): previous_error = current_planned_task.get("error") # Get validation error from last attempt logger.info("Handling verification step using LLM...") verification_result = self._get_llm_verification( verification_description=current_planned_task['description'], current_url=current_url, dom_context_str=dom_context_str, static_id_map=static_id_map, screenshot_bytes=screenshot_bytes, previous_error=previous_error ) if verification_result: # Handle user confirmation & recording based on LLM result handled_ok = self._handle_llm_verification(current_planned_task, verification_result) if not handled_ok and self._user_abort_recording: logger.warning("User aborted during verification handling.") break else: # LLM verification failed, fallback to manual failure_reason = "LLM call/parse failed for verification." logger.error(f"[Verification] {failure_reason}") if self.automated_mode: logger.error("[Auto Mode] LLM verification call failed. Skipping step.") self.task_manager.update_subtask_status(current_task_index, "skipped", result="Skipped (LLM verification failed)") else: print("AI verification failed. Falling back to manual assertion definition.") if not self._handle_assertion_recording(current_planned_task): # Manual handler self._user_abort_recording = True step_handled_internally = True # Verification is fully handled here or in called methods # --- 2. Navigation Step --- elif planned_step_desc_lower.startswith("navigate to"): try: parts = re.split("navigate to", current_planned_task['description'], maxsplit=1, flags=re.IGNORECASE) if len(parts) > 1 and parts[1].strip(): url = parts[1].strip() # print(f"Action: Navigate to {url}") exec_result = self._execute_action_for_recording("navigate", None, {"url": url}) if exec_result["success"]: # Record navigation step + implicit wait nav_step_id = self._current_step_id self.recorded_steps.append({ "step_id": nav_step_id, "action": "navigate", "description": current_planned_task['description'], "parameters": {"url": url}, "selector": None, "wait_after_secs": 0 # Wait handled by wait_for_load_state }) self._current_step_id += 1 self.recorded_steps.append({ # Add implicit wait "step_id": self._current_step_id, "action": "wait_for_load_state", "description": "Wait for page navigation", "parameters": {"state": "domcontentloaded"}, "selector": None, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION }) self._current_step_id += 1 logger.info(f"Steps {nav_step_id}, {self._current_step_id-1} recorded: navigate and wait") self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded navigation") self._consecutive_suggestion_failures = 0 # Reset failure counter on success else: # NAVIGATION FAILED - Potential trigger for re-planning logger.error(f"Navigation failed: {exec_result['message']}") reason = f"Navigation to '{url}' failed: {exec_result['message']}" # Try re-planning instead of immediate skip/abort if self._trigger_re_planning(current_planned_task, reason): logger.info("Re-planning successful, continuing with recovery steps.") # Recovery steps inserted, loop will pick them up else: # Re-planning failed or user aborted/skipped recovery if not self._user_abort_recording: # Check if abort wasn't the reason logger.warning("Re-planning failed or declined after navigation failure. Skipping original step.") # Status already updated by _trigger_re_planning if skipped/aborted else: raise ValueError("Could not parse URL after 'navigate to'.") except Exception as nav_e: logger.error(f"Error processing navigation step '{current_planned_task['description']}': {nav_e}") reason = f"Error processing navigation step: {nav_e}" if self._trigger_re_planning(current_planned_task, reason): logger.info("Re-planning successful after navigation processing error.") else: if not self._user_abort_recording: logger.warning("Re-planning failed/declined. Marking original navigation step failed.") self.task_manager.update_subtask_status(current_task_index, "failed", error=reason) # Mark as failed if no recovery step_handled_internally = True # Navigation handled # --- 3. Scroll Step --- elif planned_step_desc_lower.startswith("scroll"): try: direction = "down" if "down" in planned_step_desc_lower else "up" if "up" in planned_step_desc_lower else None if direction: exec_result = self._execute_action_for_recording("scroll", None, {"direction": direction}) if exec_result["success"]: self.recorded_steps.append({ "step_id": self._current_step_id, "action": "scroll", "description": current_planned_task['description'], "parameters": {"direction": direction}, "selector": None, "wait_after_secs": 0.2 }) self._current_step_id += 1 logger.info(f"Step {self._current_step_id-1} recorded: scroll {direction}") self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded scroll") self._consecutive_suggestion_failures = 0 # Reset failure counter else: self.task_manager.update_subtask_status(current_task_index, "skipped", result="Optional scroll failed") else: self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown scroll direction") except Exception as scroll_e: logger.error(f"Error handling scroll step: {scroll_e}") self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Scroll step failed: {scroll_e}") # Mark failed step_handled_internally = True # Scroll handled # -- 4. Handle Visual Baseline Capture Step --- elif planned_step_desc_lower.startswith("visually baseline"): planned_desc = current_planned_task['description'] logger.info(f"Handling planned step: '{planned_desc}'") target_description = planned_step_desc_lower.replace("visually baseline the", "").strip() default_baseline_id = re.sub(r'\s+', '_', target_description) # Generate default ID default_baseline_id = re.sub(r'[^\w\-]+', '', default_baseline_id)[:50] # Sanitize baseline_id = None target_selector = None capture_type = 'page' # Default to full page # --- Mode-dependent handling --- if self.automated_mode: print = lambda *args, **kwargs: logger.info(f"[Auto Mode Baseline] {' '.join(map(str, args))}") baseline_id = default_baseline_id or f"baseline_{self._current_step_id}" # Ensure ID exists print(f"Capturing baseline: '{baseline_id}' (Full Page - Default)") # Note: Automated mode currently only supports full page baselines. # To support element baselines, we'd need AI to suggest selector or pre-define targets. else: # Interactive Mode print("\n" + "="*60) print(f"Planned Step: {planned_desc}") baseline_id = input(f"Enter Baseline ID (default: '{default_baseline_id}'): ").strip() or default_baseline_id capture_choice = input("Capture Full Page (P) or Specific Element (E)? [P]: ").strip().lower() if capture_choice == 'e': capture_type = 'element' print("Click the element to capture as baseline...") self.browser_controller.clear_highlights() # Clear any previous listener_setup = self.browser_controller.setup_click_listener() if listener_setup: try: target_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0) if target_selector: print(f"Element selected. Using selector: {target_selector}") # Highlight the selected element briefly try: self.browser_controller.highlight_element(target_selector, 0, color="#00FF00", text="Baseline Element") time.sleep(1.5) # Show highlight briefly except: pass # Ignore highlight errors else: print("No element selected (timeout). Defaulting to Full Page.") capture_type = 'page' except Exception as e: logger.error(f"Error during element selection for baseline: {e}") print("Error selecting element. Defaulting to Full Page.") capture_type = 'page' self.browser_controller.remove_click_listener() # Clean up listener else: print("Error setting up click listener. Defaulting to Full Page.") capture_type = 'page' else: # Default to Page print("Capturing Full Page baseline.") capture_type = 'page' # --- Capture and Save Baseline --- capture_success = False final_screenshot_bytes = None if capture_type == 'element' and target_selector: final_screenshot_bytes = self.browser_controller.take_screenshot_element(target_selector) if final_screenshot_bytes: capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=target_selector) else: logger.error(f"Failed to capture element screenshot for baseline '{baseline_id}' selector '{target_selector}'.") if not self.automated_mode: print("Error: Failed to capture element screenshot.") else: # Full page # Use the screenshot already taken during state gathering if available final_screenshot_bytes = screenshot_bytes if final_screenshot_bytes: capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=None) else: logger.error(f"Failed to capture full page screenshot for baseline '{baseline_id}'.") if not self.automated_mode: print("Error: Failed to capture full page screenshot.") # --- Record assert_visual_match step --- if capture_success: record = { "step_id": self._current_step_id, "action": "assert_visual_match", # The corresponding execution action "description": planned_desc, # Use the baseline description "parameters": {"baseline_id": baseline_id}, "selector": target_selector, # Null for page, selector for element "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION } self.recorded_steps.append(record) self._current_step_id += 1 logger.info(f"Step {record['step_id']} recorded: assert_visual_match for baseline '{baseline_id}' ({'Element' if target_selector else 'Page'})") self.task_manager.update_subtask_status(current_task_index, "done", result=f"Recorded baseline '{baseline_id}'") self._consecutive_suggestion_failures = 0 else: # Baseline capture/save failed logger.error(f"Failed to save baseline '{baseline_id}'. Skipping recording.") if not self.automated_mode: print(f"Failed to save baseline '{baseline_id}'. Skipping.") self.task_manager.update_subtask_status(current_task_index, "skipped", result="Failed to save baseline") step_handled_internally = True # Baseline capture handled # --- 5. Wait Step --- elif planned_step_desc_lower.startswith("wait for"): logger.info(f"Handling planned wait step: {current_planned_task['description']}") wait_params = {} wait_selector = None wait_desc = current_planned_task['description'] parsed_ok = False # Flag to check if parameters were parsed try: # Try to parse common patterns time_match = re.search(r"wait for (\d+(\.\d+)?)\s+seconds?", planned_step_desc_lower) # Updated regex to be more flexible with optional 'element' word and quotes element_match = re.search(r"wait for (?:element\s*)?\'?(.*?)\'?\s+to be\s+(\w+)", planned_step_desc_lower) url_match = re.search(r"wait for url\s*\'?(.*?)\'?", planned_step_desc_lower) if time_match: wait_params["timeout_seconds"] = float(time_match.group(1)) wait_action = "wait" parsed_ok = True elif element_match: element_desc_for_selector = element_match.group(1).strip() state = element_match.group(2).strip() wait_params["state"] = state # --- Attempt to resolve selector during recording --- # For simplicity, let's *assume* the description IS the selector for now. # A better approach would use LLM or prompt user. if element_desc_for_selector.startswith(('#', '.', '[')) or '/' in element_desc_for_selector: wait_selector = element_desc_for_selector logger.info(f"Using description '{wait_selector}' directly as selector for wait.") else: logger.warning(f"Cannot directly use '{element_desc_for_selector}' as selector. Wait step might fail execution. Recording intent.") # Record without selector, executor might fail unless enhanced wait_desc += f" (Element Description: {element_desc_for_selector})" # Add detail to desc wait_params["selector"] = wait_selector # Store selector (or None) in params for execution call wait_action = "wait" # Still use generic wait parsed_ok = True elif url_match: wait_params["url"] = url_match.group(1) # URL pattern wait_action = "wait" # Use generic wait parsed_ok = True else: logger.warning(f"Could not parse wait parameters from: '{current_planned_task['description']}'. Skipping.") self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown wait format") wait_action = None parsed_ok = False if parsed_ok and wait_action: # --- Execute the wait --- logger.info(f"Executing wait action: {wait_params}") wait_exec_result = self.browser_controller.wait(**wait_params) if wait_exec_result["success"]: logger.info("Wait execution successful during recording.") # --- Record the step AFTER successful execution --- self.recorded_steps.append({ "step_id": self._current_step_id, "action": wait_action, "description": wait_desc, "parameters": wait_params, # Use the parsed params "selector": wait_selector, # Record selector if found "wait_after_secs": 0 }) self._current_step_id += 1 logger.info(f"Step {self._current_step_id-1} recorded: {wait_action} with params {wait_params}") self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded and executed wait step") else: # Wait failed during recording logger.error(f"Wait execution FAILED during recording: {wait_exec_result['message']}") # Decide how to handle: Skip? Fail? Abort? Let's skip for now. if not self.automated_mode: cont = input("Wait failed. Skip this step (S) or Abort recording (A)? [S]: ").strip().lower() if cont == 'a': self._user_abort_recording = True self._abort_reason = "User aborted after wait failed." else: self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}") else: # Automated mode - just skip self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}") except Exception as wait_e: logger.error(f"Error parsing or executing wait step: {wait_e}") # Mark as failed if parsing/execution error occurred self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Wait step processing failed: {wait_e}") step_handled_internally = True # --- 4. Default: Assume Interactive Click/Type --- if not step_handled_internally: # --- AI Suggestion --- logger.critical(dom_context_str) ai_suggestion = self._determine_action_and_selector_for_recording( current_planned_task, current_url, dom_context_str ) # --- Handle Suggestion Result --- if not ai_suggestion or ai_suggestion.get("action") == "suggestion_failed": reason = ai_suggestion.get("reasoning", "LLM failed to provide valid suggestion.") if ai_suggestion else "LLM suggestion generation failed." logger.error(f"AI suggestion failed for step {current_task_index + 1}: {reason}") self._consecutive_suggestion_failures += 1 # Check if we should try re-planning due to repeated failures if self._consecutive_suggestion_failures > self.task_manager.max_retries_per_subtask: logger.warning(f"Maximum suggestion retries exceeded for step {current_task_index + 1}. Triggering re-planning.") replan_reason = f"AI failed to suggest an action/selector repeatedly for step: '{current_planned_task['description']}'. Last reason: {reason}" if self._trigger_re_planning(current_planned_task, replan_reason): logger.info("Re-planning successful after suggestion failures.") # Loop continues with recovery steps else: # Re-planning failed or user aborted/skipped if not self._user_abort_recording: logger.error("Re-planning failed/declined. Marking original step as failed permanently.") self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Failed permanently after repeated suggestion errors and failed re-planning attempt. Last reason: {reason}", force_update=True) else: # Mark as failed for normal retry by TaskManager self.task_manager.update_subtask_status(current_task_index, "failed", error=reason) # Continue loop, TaskManager will offer retry if possible elif ai_suggestion.get("action") == "action_not_applicable": reason = ai_suggestion.get("reasoning", "Step not a click/type.") logger.info(f"Planned step '{current_planned_task['description']}' determined not applicable by AI. Skipping. Reason: {reason}") # Could this trigger re-planning? Maybe if it happens unexpectedly. For now, treat as skip. self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Skipped non-interactive step ({reason})") self._consecutive_suggestion_failures = 0 # Reset counter on skip elif ai_suggestion.get("action") in ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"]: # --- Handle Interactive Step (Confirmation/Override/Execution) --- # This method now returns True if handled (recorded, skipped, retry requested), False if aborted # It also internally updates task status based on outcome. handled_ok = self._handle_interactive_step_recording(current_planned_task, ai_suggestion) if not handled_ok and self._user_abort_recording: logger.warning("User aborted during interactive step handling.") break # Exit main loop immediately on abort # Check if the step failed execution and might need re-planning current_task_status = self.task_manager.subtasks[current_task_index]['status'] if current_task_status == 'failed': # _handle_interactive_step_recording marks failed if execution fails and user doesn't skip/abort # Check if it was an execution failure (not just suggestion retry) error_msg = self.task_manager.subtasks[current_task_index].get('error', '') if "Execution failed" in error_msg: # Check for execution failure messages logger.warning(f"Execution failed for step {current_task_index + 1}. Triggering re-planning.") replan_reason = f"Action execution failed for step '{current_planned_task['description']}'. Error: {error_msg}" if self._trigger_re_planning(current_planned_task, replan_reason): logger.info("Re-planning successful after execution failure.") # Loop continues with recovery steps else: # Re-planning failed or declined if not self._user_abort_recording: logger.error("Re-planning failed/declined after execution error. Step remains failed.") # Task already marked as failed by _handle_interactive_step_recording # else: It was likely marked failed to retry suggestion - allow normal retry flow elif current_task_status == 'done' or current_task_status == 'skipped': self._consecutive_suggestion_failures = 0 # Reset failure counter on success/skip else: # Should not happen logger.error(f"Unexpected AI suggestion action: {ai_suggestion.get('action')}. Skipping step.") self.task_manager.update_subtask_status(current_task_index, "failed", error="Unexpected AI action suggestion") # --- Cleanup after processing a step attempt --- self.browser_controller.clear_highlights() # Listener removal is handled within _handle_interactive_step_recording and wait_for_user_click... # self.browser_controller.remove_click_listener() # Ensure listener is off - redundant? # Small delay between steps/attempts if not self._user_abort_recording: # Don't delay if aborting time.sleep(0.3) # --- Loop End --- if not recording_status["success"] and iteration_count >= MAX_RECORDING_ITERATIONS: recording_status["message"] = f"⚠️ Recording Stopped: Maximum iterations ({MAX_RECORDING_ITERATIONS}) reached." recording_status["success"] = False # Ensure max iterations means failure logger.warning(recording_status["message"]) # --- Final Save --- if not self._user_abort_recording and self.recorded_steps: try: if recording_status.get("success", False): # Only check if currently marked as success perm_failed_tasks_final = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask] if perm_failed_tasks_final: recording_status["success"] = False # Override success if any task failed recording_status["message"] = recording_status["message"].replace("completed.", "completed with failures.") # Adjust message logger.warning("Overriding overall success status to False due to permanently failed steps found.") output_data = { "test_name": f"{feature_description[:50]}_Test", "feature_description": feature_description, "recorded_at": datetime.utcnow().isoformat() + "Z", "console_logs": self.browser_controller.console_messages, "steps": self.recorded_steps } recording_status["console_messages"] = self.browser_controller.console_messages ts = time.strftime("%Y%m%d_%H%M%S") safe_feature_name = re.sub(r'[^\w\-]+', '_', feature_description)[:50] if self.file_name is None: self.file_name = f"test_{safe_feature_name}_{ts}.json" else: self.file_name = self.file_name+f"{safe_feature_name}_{ts}_test.json" output_dir = "output" if not os.path.exists(output_dir): os.makedirs(output_dir) self.output_file_path = os.path.join(output_dir, self.file_name) with open(self.output_file_path, 'w', encoding='utf-8') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) recording_status["output_file"] = self.output_file_path recording_status["steps_recorded"] = len(self.recorded_steps) # Set success only if we saved something and didn't explicitly fail/abort if recording_status["success"]: logger.info(f"Recording successfully saved to: {self.output_file_path}") else: logger.warning(f"Recording finished with status: {'Failed' if not self._user_abort_recording else 'Aborted'}. Saved {len(self.recorded_steps)} steps to: {self.output_file_path}. Message: {recording_status.get('message')}") except Exception as save_e: logger.error(f"Failed to save recorded steps to JSON: {save_e}", exc_info=True) recording_status["message"] = f"Failed to save recording: {save_e}" recording_status["success"] = False elif self._user_abort_recording: if not self._abort_reason: recording_status["message"] = "Recording aborted by user. No file saved." recording_status["success"] = False else: # No steps recorded recording_status["message"] = "No steps were recorded." recording_status["success"] = False except ValueError as e: # Catch planning errors specifically logger.critical(f"Test planning failed: {e}", exc_info=True) recording_status["message"] = f"❌ Test Planning Failed: {e}" recording_status["success"] = False # Ensure failure state except Exception as e: logger.critical(f"An critical unexpected error occurred during recording: {e}", exc_info=True) recording_status["message"] = f"❌ Critical Error during recording: {e}" recording_status["success"] = False # Ensure failure state finally: logger.info("--- Ending Test Recording ---") # Ensure cleanup even if browser wasn't started fully if hasattr(self, 'browser_controller') and self.browser_controller: self.browser_controller.clear_highlights() self.browser_controller.remove_click_listener() # Attempt removal self.panel.remove_recorder_panel() self.browser_controller.close() end_time = time.time() recording_status["duration_seconds"] = round(end_time - start_time, 2) logger.info(f"Recording process finished in {recording_status['duration_seconds']:.2f} seconds.") logger.info(f"Final Recording Status: {'Success' if recording_status['success'] else 'Failed/Aborted'} - {recording_status['message']}") if recording_status.get("output_file"): logger.info(f"Output file: {recording_status.get('output_file')}") return recording_status # Return the detailed status dictionary

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GroundNG/QA-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server