recorder_agent.py•183 kB
# /src/recorder_agent.py
import json
from importlib import resources
import logging
import time
import re
from patchright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError
from typing import Dict, Any, Optional, List, Tuple, Union, Literal
import random
import os
import threading # For timer
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from PIL import Image
import io
# Use relative imports within the package
from ..browser.browser_controller import BrowserController
from ..browser.panel.panel import Panel
from ..llm.llm_client import LLMClient
from ..core.task_manager import TaskManager
from ..dom.views import DOMState, DOMElementNode, SelectorMap # Import DOM types
# Configure logger
logger = logging.getLogger(__name__)
# --- Recorder Settings ---
INTERACTIVE_TIMEOUT_SECS = 0 # Time for user to override AI suggestion
DEFAULT_WAIT_AFTER_ACTION = 0.5 # Default small wait added after recorded actions
# --- End Recorder Settings ---
class PlanSubtasksSchema(BaseModel):
"""Schema for the planned subtasks list."""
planned_steps: List[str] = Field(..., description="List of planned test step descriptions as strings.")
class LLMVerificationParamsSchema(BaseModel):
"""Schema for parameters within a successful verification."""
expected_text: Optional[str] = Field(None, description="Expected text for equals/contains assertions.")
attribute_name: Optional[str] = Field(None, description="Attribute name for attribute_equals assertion.")
expected_value: Optional[str] = Field(None, description="Expected value for attribute_equals assertion.")
expected_count: Optional[int] = Field(None, description="Expected count for element_count assertion.")
class LLMVerificationSchema(BaseModel):
"""Schema for the result of an LLM verification step."""
verified: bool = Field(..., description="True if the condition is met, False otherwise.")
assertion_type: Optional[Literal[
'assert_text_equals',
'assert_text_contains',
'assert_visible',
'assert_llm_verification',
'assert_hidden',
'assert_attribute_equals',
'assert_element_count',
'assert_checked',
'assert_enabled',
'assert_disabled',
'assert_not_checked'
]] = Field(None, description="Required if verified=true. Type of assertion suggested, reflecting the *actual observed state*.")
element_index: Optional[int] = Field(None, description="Index of the *interactive* element [index] from context that might *also* relate to the verification (e.g., the button just clicked), if applicable. Set to null if verification relies solely on a static element or non-indexed element.")
verification_selector: Optional[str] = Field(None, description="Final CSS selector for the verifying element (generated by the system based on index or static_id). LLM should output null for this field.")
verification_static_id: Optional[str] = Field(None, description="Temporary ID (e.g., 's12') of the static element from context that confirms the verification, if applicable. Use this *instead* of verification_selector for static elements.")
parameters: Optional[LLMVerificationParamsSchema] = Field(default_factory=dict, description="Parameters for the assertion based on the *actual observed state*. Required if assertion type needs params (e.g., assert_text_equals).")
reasoning: str = Field(..., description="Explanation for the verification result, explaining how the intent is met or why it failed. If verified=true, justify the chosen selector and parameters.")
class ReplanSchema(BaseModel):
"""Schema for recovery steps or abort action during re-planning."""
recovery_steps: Optional[List[str]] = Field(None, description="List of recovery step descriptions (1-3 steps), if recovery is possible.")
action: Optional[Literal["abort"]] = Field(None, description="Set to 'abort' if recovery is not possible/safe.")
reasoning: Optional[str] = Field(None, description="Reasoning, especially required if action is 'abort'.")
class RecorderSuggestionParamsSchema(BaseModel):
"""Schema for parameters within a recorder action suggestion."""
index: Optional[int] = Field(None, description="Index of the target element from context (required for click/type/check/uncheck/key_press/drag_and_drop source).")
keys: Optional[str] = Field(None, description="Key(s) to press (required for key_press action). E.g., 'Enter', 'Control+A'.")
destination_index: Optional[int] = Field(None, description="Index of the target element for drag_and_drop action.")
option_label: Optional[str] = Field(None, description="Visible text/label of the option to select (**required for select action**)")
text: Optional[str] = Field(None, description="Text to type (required for type action).")
class RecorderSuggestionSchema(BaseModel):
"""Schema for the AI's suggestion for a click/type action during recording."""
action: Literal["click", "type", "select", "check", "uncheck", "key_press", "drag_and_drop", "action_not_applicable", "suggestion_failed"] = Field(..., description="The suggested browser action or status.")
parameters: RecorderSuggestionParamsSchema = Field(default_factory=dict, description="Parameters for the action (index, text, option_label).")
reasoning: str = Field(..., description="Explanation for the suggestion.")
class AssertionTargetIndexSchema(BaseModel):
"""Schema for identifying the target element index for a manual assertion."""
index: Optional[int] = Field(None, description="Index of the most relevant element from context, or null if none found/identifiable.")
reasoning: Optional[str] = Field(None, description="Reasoning, especially if index is null.")
class WebAgent:
"""
Orchestrates AI-assisted web test recording, generating reproducible test scripts.
Can also function in a (now legacy) execution mode.
"""
def __init__(self,
llm_client: LLMClient,
headless: bool = True, # Note: Recorder mode forces non-headless
max_iterations: int = 50, # Max planned steps to process in recorder
max_history_length: int = 10,
max_retries_per_subtask: int = 1, # Retries for *AI suggestion* or failed *execution* during recording
max_extracted_data_history: int = 7, # Less relevant for recorder? Keep for now.
is_recorder_mode: bool = False,
automated_mode: bool = False,
filename: str = "",
baseline_dir: str = "./visual_baselines"):
self.llm_client = llm_client
self.is_recorder_mode = is_recorder_mode
self.baseline_dir = os.path.abspath(baseline_dir) # Store baseline dir
# Ensure baseline dir exists during initialization
try:
os.makedirs(self.baseline_dir, exist_ok=True)
logger.info(f"Visual baseline directory set to: {self.baseline_dir}")
except OSError as e:
logger.warning(f"Could not create baseline directory '{self.baseline_dir}': {e}. Baseline saving might fail.")
# Determine effective headless: Recorder forces non-headless unless automated
effective_headless = headless
if self.is_recorder_mode and not automated_mode:
effective_headless = False # Interactive recording needs visible browser
if headless:
logger.warning("Interactive Recorder mode initiated, but headless=True was requested. Forcing headless=False.")
elif automated_mode and not headless:
logger.info("Automated mode running with visible browser (headless=False).")
self.browser_controller = BrowserController(headless=effective_headless, auth_state_path='_'.join([filename, "auth_state.json"]))
self.panel = Panel()
# TaskManager manages the *planned* steps generated by LLM initially
self.task_manager = TaskManager(max_retries_per_subtask=max_retries_per_subtask)
self.history: List[Dict[str, Any]] = []
self.extracted_data_history: List[Dict[str, Any]] = [] # Keep for potential context, but less critical now
self.max_iterations = max_iterations # Limit for planned steps processing
self.max_history_length = max_history_length
self.max_extracted_data_history = max_extracted_data_history
self.output_file_path: Optional[str] = None # Path for the recorded JSON
self.file_name = filename
self.feature_description: Optional[str] = None
self._latest_dom_state: Optional[DOMState] = None
self._consecutive_suggestion_failures = 0 # Track failures for the *same* step index
self._last_failed_step_index = -1 # Track which step index had the last failure
self._last_static_id_map: Dict[str, 'DOMElementNode'] = {}
# --- Recorder Specific State ---
self.recorded_steps: List[Dict[str, Any]] = []
self._current_step_id = 1 # Counter for recorded steps
self._user_abort_recording = False
# --- End Recorder Specific State ---
self.automated_mode = automated_mode
# Log effective mode
automation_status = "Automated" if self.automated_mode else "Interactive"
logger.info(f"WebAgent (Recorder Mode / {automation_status}) initialized (headless={effective_headless}, max_planned_steps={max_iterations}, max_hist={max_history_length}, max_retries={max_retries_per_subtask}).")
logger.info(f"Visual baseline directory: {self.baseline_dir}")
def _add_to_history(self, entry_type: str, data: Any):
"""Adds an entry to the agent's history, maintaining max length."""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_data_str = "..."
try:
# Basic sanitization (same as before)
if isinstance(data, dict):
log_data = {k: (str(v)[:200] + '...' if len(str(v)) > 200 else v)
for k, v in data.items()}
elif isinstance(data, (str, bytes)):
log_data = str(data[:297]) + "..." if len(data) > 300 else str(data)
else:
log_data = data
log_data_str = str(log_data)
if len(log_data_str) > 300: log_data_str = log_data_str[:297]+"..."
except Exception as e:
logger.warning(f"Error sanitizing history data: {e}")
log_data = f"Error processing data: {e}"
log_data_str = log_data
entry = {"timestamp": timestamp, "type": entry_type, "data": log_data}
self.history.append(entry)
if len(self.history) > self.max_history_length:
self.history.pop(0)
logger.debug(f"[HISTORY] Add: {entry_type} - {log_data_str}")
def _get_history_summary(self) -> str:
"""Provides a concise summary of the recent history for the LLM."""
if not self.history: return "No history yet."
summary = "Recent History (Oldest First):\n"
for entry in self.history:
entry_data_str = str(entry['data'])
if len(entry_data_str) > 300: entry_data_str = entry_data_str[:297] + "..."
summary += f"- [{entry['type']}] {entry_data_str}\n"
return summary.strip()
def _clean_llm_response_to_json(self, llm_output: str) -> Optional[Dict[str, Any]]:
"""Attempts to extract and parse JSON from the LLM's output."""
logger.debug(f"[LLM PARSE] Attempting to parse LLM response (length: {len(llm_output)}).")
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", llm_output, re.DOTALL | re.IGNORECASE)
if match:
json_str = match.group(1).strip()
logger.debug(f"[LLM PARSE] Extracted JSON from markdown block.")
else:
start_index = llm_output.find('{')
end_index = llm_output.rfind('}')
if start_index != -1 and end_index != -1 and end_index > start_index:
json_str = llm_output[start_index : end_index + 1].strip()
logger.debug(f"[LLM PARSE] Attempting to parse extracted JSON between {{ and }}.")
else:
logger.warning("[LLM PARSE] Could not find JSON structure in LLM output.")
self._add_to_history("LLM Parse Error", {"reason": "No JSON structure found", "raw_output_snippet": llm_output[:200]})
return None
# Pre-processing (same as before)
try:
def escape_quotes_replacer(match):
key_part, colon_part, open_quote, value, close_quote = match.groups()
escaped_value = re.sub(r'(?<!\\)"', r'\\"', value)
return f'{key_part}{colon_part}{open_quote}{escaped_value}{close_quote}'
keys_to_escape = ["selector", "text", "reasoning", "url", "result", "answer", "reason", "file_path", "expected_text", "attribute_name", "expected_value"]
pattern_str = r'(\"(?:' + '|'.join(keys_to_escape) + r')\")(\s*:\s*)(\")(.*?)(\")'
pattern = re.compile(pattern_str, re.DOTALL)
json_str = pattern.sub(escape_quotes_replacer, json_str)
json_str = json_str.replace('\\\\n', '\\n').replace('\\n', '\n')
json_str = json_str.replace('\\\\"', '\\"')
json_str = json_str.replace('\\\\t', '\\t')
json_str = re.sub(r',\s*([\}\]])', r'\1', json_str)
except Exception as clean_e:
logger.warning(f"[LLM PARSE] Error during pre-parsing cleaning: {clean_e}")
# Attempt Parsing (check for 'action' primarily, parameters might be optional for some recorder actions)
try:
parsed_json = json.loads(json_str)
if isinstance(parsed_json, dict) and "action" in parsed_json:
# Parameters might not always be present (e.g., simple scroll)
if "parameters" not in parsed_json:
parsed_json["parameters"] = {} # Ensure parameters key exists
logger.debug(f"[LLM PARSE] Successfully parsed action JSON: {parsed_json}")
return parsed_json
else:
logger.warning(f"[LLM PARSE] Parsed JSON missing 'action' key or is not a dict: {parsed_json}")
self._add_to_history("LLM Parse Error", {"reason": "Missing 'action' key", "parsed_json": parsed_json, "cleaned_json_string": json_str[:200]})
return None
except json.JSONDecodeError as e:
logger.error(f"[LLM PARSE] Failed to decode JSON from LLM output: {e}")
logger.error(f"[LLM PARSE] Faulty JSON string snippet (around pos {e.pos}): {json_str[max(0, e.pos-50):e.pos+50]}")
self._add_to_history("LLM Parse Error", {"reason": f"JSONDecodeError: {e}", "error_pos": e.pos, "json_string_snippet": json_str[max(0, e.pos-50):e.pos+50]})
return None
except Exception as e:
logger.error(f"[LLM PARSE] Unexpected error during final JSON parsing: {e}", exc_info=True)
return None
def _plan_subtasks(self, feature_description: str):
"""Uses the LLM to break down the feature test into planned steps using generate_json."""
logger.info(f"Planning test steps for feature: '{feature_description}'")
self.feature_description = feature_description
# --- Prompt Construction (Adjusted for generate_json) ---
prompt = f"""
You are an AI Test Engineer planning steps for recording. Given the feature description: "{feature_description}"
Break this down into a sequence of specific browser actions or verification checks.
Each step should be a single instruction (e.g., "Navigate to...", "Click the 'Submit' button", "Type 'testuser' into username field", "Verify text 'Success' is visible").
The recorder agent will handle identifying elements and generating selectors based on these descriptions.
**Key Types of Steps to Plan:**
1. **Navigation:** `Navigate to https://example.com/login`
2. **Action:** `Click element 'Submit Button'` or `Type 'testuser' into element 'Username Input'` or `Check 'male' radio button or Check 'Agree to terms & conditions'` or `Uncheck the 'subscribe to newsletter' checkbox` (Describe the element clearly). **IMPORTANT for Dropdowns (<select>):** If the task involves selecting an option (e.g., "Select 'Canada' from the 'Country' dropdown"), generate a **SINGLE step** like: `Select option 'Canada' in element 'Country Dropdown'` (Describe the main `<select>` element and the option's visible text/label).
3. **Key Press:** `Press 'Enter' key on element 'Search Input'`, `Press 'Tab' key`. (Specify the element if the press is targeted, otherwise it might be global).
4. **Drag and Drop:** `Drag element 'Item A' onto element 'Cart Area'`. (Clearly describe source and target).
5. **Wait:** `Wait for 5 seconds`
6. **Verification:** Phrase as a check. The recorder will prompt for specifics.
- `Verify 'Login Successful' message is present`
- `Verify 'Cart Count' shows 1`
- `Verify 'Submit' button is disabled`
- **GOOD:** `Verify login success indicator is visible` (More general)
- **AVOID:** `Verify text 'Welcome John Doe!' is visible` (Too specific if name changes)
7. **Scrolling:** `Scroll down` (if content might be off-screen)
8. **Visual Baseline Capture:** If the feature description implies capturing a visual snapshot at a key state (e.g., after login, after adding to cart), use: `Visually baseline the [short description of state]`. Examples: `Visually baseline the login page`, `Visually baseline the dashboard after login`, `Visually baseline the product details page`.
**CRITICAL:** Focus on the *intent* of each step. Do NOT include specific selectors or indices in the plan. The recorder determines those interactively.
**Output Format:** Respond with a JSON object conforming to the following structure:
{{
"planned_steps": ["Step 1 description", "Step 2 description", ...]
}}
Example Test Case: "Test login on example.com with user 'tester' and pass 'pwd123', then verify the welcome message 'Welcome, tester!' is shown."
Example JSON Output Structure:
{{
"planned_steps": [
"Navigate to https://example.com/login",
"Type 'tester' into element 'username input field'",
"Type 'pwd123' into element 'password input field'",
"Click element 'login button'",
"Verify 'Welcome, tester!' message is present"
"Visually baseline the user dashboard"
]
}}
Now, generate the JSON object containing the planned steps for: "{feature_description}"
"""
# --- End Prompt ---
logger.debug(f"[TEST PLAN] Sending Planning Prompt (snippet):\n{prompt[:500]}...")
response_obj = self.llm_client.generate_json(PlanSubtasksSchema, prompt)
subtasks = None
raw_response_for_history = "N/A (Used generate_json)"
if isinstance(response_obj, PlanSubtasksSchema):
logger.debug(f"[TEST PLAN] LLM JSON response parsed successfully: {response_obj}")
# Validate the parsed list
if isinstance(response_obj.planned_steps, list) and all(isinstance(s, str) and s for s in response_obj.planned_steps):
subtasks = response_obj.planned_steps
logger.info(f"Subtasks: {subtasks}")
else:
logger.warning(f"[TEST PLAN] Parsed JSON planned_steps is not a list of non-empty strings: {response_obj.planned_steps}")
raw_response_for_history = f"Parsed object invalid content: {response_obj}" # Log the invalid object
elif isinstance(response_obj, str): # Handle error string from generate_json
logger.error(f"[TEST PLAN] Failed to generate/parse planned steps JSON from LLM: {response_obj}")
raw_response_for_history = response_obj[:500]+"..."
else: # Handle unexpected return type
logger.error(f"[TEST PLAN] Unexpected response type from generate_json: {type(response_obj)}")
raw_response_for_history = f"Unexpected type: {type(response_obj)}"
# --- Update Task Manager ---
if subtasks and len(subtasks) > 0:
self.task_manager.add_subtasks(subtasks) # TaskManager stores the *planned* steps
self._add_to_history("Test Plan Created", {"feature": feature_description, "steps": subtasks})
logger.info(f"Successfully planned {len(subtasks)} test steps.")
logger.debug(f"[TEST PLAN] Planned Steps: {subtasks}")
else:
logger.error("[TEST PLAN] Failed to generate or parse valid planned steps from LLM response.")
# Use the captured raw_response_for_history which contains error details
self._add_to_history("Test Plan Failed", {"feature": feature_description, "raw_response": raw_response_for_history})
raise ValueError("Failed to generate a valid test plan from the feature description.")
def _save_visual_baseline(self, baseline_id: str, screenshot_bytes: bytes, selector: Optional[str] = None) -> bool:
"""Saves the screenshot and metadata for a visual baseline."""
if not screenshot_bytes:
logger.error(f"Cannot save baseline '{baseline_id}', no screenshot bytes provided.")
return False
image_path = os.path.join(self.baseline_dir, f"{baseline_id}.png")
metadata_path = os.path.join(self.baseline_dir, f"{baseline_id}.json")
# --- Prevent Overwrite (Optional - Prompt user or fail) ---
if os.path.exists(image_path) or os.path.exists(metadata_path):
if self.automated_mode:
logger.warning(f"Baseline '{baseline_id}' already exists. Overwriting in automated mode.")
# Allow overwrite in automated mode
else: # Interactive mode
overwrite = input(f"Baseline '{baseline_id}' already exists. Overwrite? (y/N) > ").strip().lower()
if overwrite != 'y':
logger.warning(f"Skipping baseline save for '{baseline_id}' - user chose not to overwrite.")
return False # Indicate skipped save
# --- End Overwrite Check ---
try:
# 1. Save Image
img = Image.open(io.BytesIO(screenshot_bytes))
img.save(image_path, format='PNG')
logger.info(f"Saved baseline image to: {image_path}")
# 2. Gather Metadata
current_url = self.browser_controller.get_current_url()
viewport_size = self.browser_controller.get_viewport_size()
browser_info = self.browser_controller.get_browser_version()
os_info = self.browser_controller.get_os_info()
timestamp = datetime.now(timezone.utc).isoformat()
metadata = {
"baseline_id": baseline_id,
"image_file": os.path.basename(image_path), # Store relative path
"created_at": timestamp,
"updated_at": timestamp, # Same initially
"url_captured": current_url,
"viewport_size": viewport_size,
"browser_info": browser_info,
"os_info": os_info,
"selector_captured": selector # Store selector if it was an element capture
}
# 3. Save Metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.info(f"Saved baseline metadata to: {metadata_path}")
return True # Success
except Exception as e:
logger.error(f"Error saving baseline '{baseline_id}' (Image: {image_path}, Meta: {metadata_path}): {e}", exc_info=True)
# Clean up potentially partially saved files
if os.path.exists(image_path): os.remove(image_path)
if os.path.exists(metadata_path): os.remove(metadata_path)
return False # Failure
def _get_llm_verification(self,
verification_description: str,
current_url: str,
dom_context_str: str,
static_id_map: Dict[str, Any],
screenshot_bytes: Optional[bytes] = None,
previous_error: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Uses LLM's generate_json (potentially multimodal) to verify if a condition is met.
Returns a dictionary representation of the result or None on error.
"""
logger.info(f"Requesting LLM verification (using generate_json) for: '{verification_description}'")
logger.info(f"""dom_context_str:
{dom_context_str}
""")
# --- Prompt Adjustment for generate_json ---
prompt = f"""
You are an AI Test Verification Assistant. Your task is to determine if a specific condition, **or its clear intent**, is met based on the current web page state and If the goal IS met, propose the **most robust and specific, deterministic assertion** possible to confirm this state.
**Overall Goal:** {self.feature_description}
**Verification Step:** {verification_description}
**Current URL:** {current_url}
{f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else ""}
**Input Context (Visible Elements with Indices for Interactive ones):**
This section shows visible elements on the page.
- Interactive elements are marked with `[index]` (e.g., `[5]<button>Submit</button>`).
- Static elements crucial for context are marked with `(Static)` (e.g., `<p (Static)>Login Successful!</p>`).
- Some plain static elements may include a hint about their parent, like `(inside: <div id="summary">)`, to help locate them.
- Some may be not visible but are interactive, assertable
```html
{dom_context_str}
```
{f"**Screenshot Analysis:** Please analyze the attached screenshot for visual confirmation or contradiction of the verification step." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."}
**Your Task:**
1. Analyze the provided context (DOM, URL, and screenshot if provided).
2. Determine if the **intent** behind the "Verification Step" is currently TRUE or FALSE.
* Example: If the step is "Verify 'Login Complete'", but the page shows "Welcome, User!", the *intent* IS met.
3. Respond with a JSON object matching the required schema.
* Set the `verified` field (boolean).
* Provide detailed `reasoning` (string), explaining *how* the intent is met or why it failed.
* **If `verified` is TRUE:**
* Identify the **single most relevant element** (interactive OR static) in the context that **specifically confirms the successful outcome** of the preceding action(s).
* If confirmed by an **interactive element `[index]`**: Set `element_index` to that index. `verification_static_id` should be null.
* If confirmed by a **static element shown with `data-static-id="sXXX"`**: Set `verification_static_id` to that ID string (e.g., "s15"). `element_index` should be null.
* **`verification_selector` (Set to null by you):** The system will generate the final CSS selector based on the provided index or static ID. Leave this field as `null`.
* **`assertion_type` (Required):** Propose a specific, deterministic assertion. Determine the most appropriate assertion type based on the **actual observed state** and the verification intent. **Prefer `assert_text_contains` for text verification unless the step demands an *exact* match.**
* Use `assert_llm_verification` for cases where vision LLMs will be better than any other selector for this problem. Visual UI stuff like overflow, truncation, overlap, positioning all this can't be determined via normal playwright automation. You need llm vision verification for such stuff.
* Use `assert_text_equals` / `assert_text_contains` for text content. Prefer this over visible unless text doesn't confirm the verification. Note to use the **EXACT TEXT INCLUDING ANY PUNCTUATION**
* Use `assert_checked` if the intent is to verify a checkbox or radio button **is currently selected/checked**.
* Use `assert_not_checked` if the intent is to verify it **is NOT selected/checked**.
* Use `assert_visible` / `assert_hidden` for visibility states.
* Use `assert_disabled` / `assert_enabled` for checking disabled states
* Use `assert_attribute_equals` ONLY for comparing the *string value* of an attribute (e.g., `class="active"`, `value="Completed"`). **DO NOT use it for boolean attributes like `checked`, `disabled`, `selected`. Use state assertions instead.**
* Use `assert_element_count` for counting elements matching a selector. **Note that you can't count child elements by using static id of parent elements. You need to use the selector for the elements you need to count**
* **`parameters` (Optional):** Provide necessary parameters ONLY if the chosen `assertion_type` requires them (e.g., `assert_text_equals` needs `expected_text`). For `assert_checked`, `assert_not_checked`, `assert_visible`, `assert_hidden`, `assert_disabled`, `assert_enabled`, parameters should generally be empty (`{{}}`) or omitted. Ensure parameters reflect the *actual observed state* (e.g., observed text).
* **Guidance for Robustness:**
* **Prefer specific checks:** `assert_text_contains` on a specific message is better than just `assert_visible` on a generic container *if* the text confirms the verification goal.
* **Dynamic Content:** For content that loads (e.g., images replacing placeholders, data appearing), prefer assertions on **stable containers** (`assert_element_count`, `assert_visible` on the container) or the **final loaded state** if reliably identifiable, rather than the transient loading state (like placeholder text).
* **Element Targeting:** Identify the **single most relevant element** (interactive `[index]` OR static `data-static-id="sXXX"`) that **proves** the assertion. Set `element_index` OR `verification_static_id` accordingly.
* **`verification_selector` (Set to null):** The system generates the final selector. Leave this null.
* **`parameters` (Required if needed):** Provide necessary parameters based on the chosen `assertion_type` and the **actual observed state** (e.g., the exact text seen for `assert_text_contains`). Empty `{{}}` if no parameters needed (e.g., `assert_visible`).
* **If `verified` is FALSE:**
* `assertion_type`, `element_index`, `verification_selector`, `parameters` should typically be null/omitted.
**JSON Output Structure Examples:**
*Success Case (Static Element Confirms):*
```json
{{
"verified": true,
"assertion_type": "assert_text_contains", // Preferred over just visible
"element_index": null,
"verification_static_id": "s23", // ID from context
"verification_selector": null,
"parameters": {{ "expected_text": "logged in!" }}, // Actual text observed
"reasoning": "The static element <p data-static-id='s23'> shows 'logged in!', fulfilling the verification step's goal."
}}
```json
{{
"verified": true,
"assertion_type": "assert_element_count", // Verifying 5 items loaded
"element_index": null, // Index not needed if counting multiple matches
"verification_static_id": "s50", // Target the container element
"verification_selector": null, // System will generate selector for container s50
"parameters": {{ "expected_count": 5 }},
"reasoning": "The list container <ul data-static-id='s50'> visually contains 5 items, confirming the verification requirement."
}}
```
```json
{{
"verified": true,
"assertion_type": "assert_llm_verification",
"element_index": null,
"verification_static_id": null,
"verification_selector": null,
"parameters": {{ }},
"reasoning": "Checking if text is overflowing is best suited for a vision assisted llm verfication"
}}
```
```
*Success Case (Radio Button Checked):*
```json
{{
"verified": true,
"assertion_type": "assert_checked",
"element_index": 9, // <-- LLM provides interactive index
"verification_static_id": null,
"verification_selector": null, // <-- LLM sets to null
"parameters": {{}},
"reasoning": "The 'Credit Card' radio button [9] is selected on the page, fulfilling the verification requirement."
}}
```
*Success Case (Checkbox Not Checked - Interactive):*
```json
{{
"verified": true,
"assertion_type": "assert_not_checked",
"element_index": 11, // <-- LLM provides interactive index
"verification_static_id": null,
"verification_selector": null, // <-- LLM sets to null
"parameters": {{}},
"reasoning": "The 'Subscribe' checkbox [11] is not checked, as required by the verification step."
}}
```
*Success Case (Interactive Element Confirms - Visible):*
```json
{{
"verified": true,
"assertion_type": "assert_visible",
"element_index": 8, // <-- LLM provides interactive index
"verification_static_id": null,
"verification_selector": null, // <-- LLM sets to null
"parameters": {{}},
"reasoning": "Element [8] (logout button) is visible, confirming the user is logged in as per the verification step intent."
}}
```
*Success Case (Attribute on Static Element):*
```json
{{
"verified": true,
"assertion_type": "assert_attribute_equals",
"element_index": null,
"verification_static_id": "s45", // <-- LLM provides static ID
"verification_selector": null, // <-- LLM sets to null
"parameters": {{ "attribute_name": "data-status", "expected_value": "active" }},
"reasoning": "The static <div data-static-id='s45' data-status='active' (Static)> element has the 'data-status' attribute set to 'active', confirming the verification requirement."
}}
```
*Failure Case:*
```json
{{
"verified": false,
"assertion_type": null,
"element_index": null,
"verification_static_id": null,
"verification_selector": null,
"parameters": {{}},
"reasoning": "Could not find the 'Success' message or any other indication of successful login in the provided context or screenshot."
}}
```
**CRITICAL:** If `verified` is true, provide *either* `element_index` for interactive elements OR `verification_static_id` for static elements. **Do not generate the `verification_selector` yourself; set it to `null`.** Explain any discrepancies between the plan and the observed state in `reasoning`. Respond ONLY with the JSON object matching the schema.
Now, generate the verification JSON for: "{verification_description}"
"""
if previous_error:
err = f"\n**Previous Attempt Feedback:**\nA previous verification attempt for this step resulted in an error: {previous_error}\nPlease carefully re-evaluate the current state and suggest a correct and verifiable assertion, or indicate if the verification still fails.\n" if previous_error else ""
logger.critical(err)
# Call generate_json, passing image_bytes if available
logger.debug("[LLM VERIFY] Sending prompt (and potentially image) to generate_json...")
response_obj = self.llm_client.generate_json(
LLMVerificationSchema,
prompt,
image_bytes=screenshot_bytes # Pass the image bytes here
)
verification_json = None # Initialize
if isinstance(response_obj, LLMVerificationSchema):
logger.debug(f"[LLM VERIFY] Successfully parsed response: {response_obj}")
verification_dict = response_obj.model_dump(exclude_none=True)
assertion_type = verification_dict.get("assertion_type")
params = verification_dict.get("parameters", {})
needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count']
no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_disabled', 'assert_enabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification']
# --- Post-hoc Validation ---
is_verified_by_llm = verification_dict.get("verified")
if is_verified_by_llm is None:
logger.error("[LLM VERIFY FAILED] Parsed JSON missing required 'verified' field.")
self._add_to_history("LLM Verification Error", {"reason": "Missing 'verified' field", "parsed_dict": verification_dict})
return None
if not verification_dict.get("reasoning"):
logger.warning(f"[LLM VERIFY] Missing 'reasoning' in response: {verification_dict}")
verification_dict["reasoning"] = "No reasoning provided by LLM."
# --- Selector Generation Logic ---
final_selector = None # Initialize selector to be potentially generated
if is_verified_by_llm:
static_id = verification_dict.get("verification_static_id")
interactive_index = verification_dict.get("element_index") # Keep original name from schema
if static_id:
# --- Static element identified by ID ---
target_node = static_id_map.get(static_id)
if target_node:
logger.info(f"LLM identified static element via ID: {static_id}. Generating selector...")
# Import or access DomService appropriately here
from ..dom.service import DomService
try:
# Generate selector using Python logic
final_selector = DomService._enhanced_css_selector_for_element(target_node)
if not final_selector:
logger.error(f"Failed to generate selector for static node (ID: {static_id}). Falling back to XPath.")
final_selector = f"xpath={target_node.xpath}" # Fallback
verification_dict["verification_selector"] = final_selector
verification_dict["_static_id_used"] = static_id
verification_dict["element_index"] = None # Ensure index is None for static
logger.info(f"Generated selector for static ID {static_id}: {final_selector}")
except Exception as gen_err:
logger.error(f"Error generating selector for static ID {static_id}: {gen_err}")
# Decide how to handle: fail verification? Use XPath?
verification_dict["verification_selector"] = f"xpath={target_node.xpath}" # Fallback
verification_dict["element_index"] = None
verification_dict["reasoning"] += f" [Selector generation failed: {gen_err}]"
else:
logger.error(f"LLM returned static ID '{static_id}' but it wasn't found in the context map!")
# Mark as failed or handle error appropriately
verification_dict["verified"] = False
verification_dict["reasoning"] = f"Verification failed: LLM hallucinated static ID '{static_id}'."
# Clear fields that shouldn't be present on failure
verification_dict.pop("assertion_type", None)
verification_dict.pop("verification_static_id", None)
verification_dict.pop("parameters", None)
elif interactive_index is not None:
# --- Interactive element identified by index ---
if self._latest_dom_state and self._latest_dom_state.selector_map:
target_node = self._latest_dom_state.selector_map.get(interactive_index)
if target_node and target_node.css_selector:
final_selector = target_node.css_selector
verification_dict["verification_selector"] = final_selector
# verification_dict["element_index"] is already set
logger.info(f"Using pre-generated selector for interactive index {interactive_index}: {final_selector}")
else:
logger.error(f"LLM returned interactive index [{interactive_index}] but node or selector not found in map!")
verification_dict["verified"] = False
verification_dict["reasoning"] = f"Verification failed: LLM hallucinated interactive index '{interactive_index}' or selector missing."
verification_dict.pop("assertion_type", None)
verification_dict.pop("element_index", None)
verification_dict.pop("parameters", None)
else:
logger.error("Cannot resolve interactive index: DOM state or selector map missing.")
verification_dict["verified"] = False
verification_dict["reasoning"] = "Verification failed: Internal error retrieving DOM state for interactive index."
verification_dict.pop("assertion_type", None)
verification_dict.pop("element_index", None)
verification_dict.pop("parameters", None)
else:
# Verified = true, but LLM provided neither static ID nor interactive index
logger.error("LLM verification PASSED but provided neither static ID nor interactive index!")
# verification_dict["verified"] = False
verification_dict["reasoning"] = "Verified to be true by using vision LLMs"
verification_dict.pop("assertion_type", None)
verification_dict.pop("parameters", None)
if final_selector and target_node and self.browser_controller.page:
try:
handles = self.browser_controller.page.query_selector_all(final_selector)
num_matches = len(handles)
validation_passed = False
if num_matches == 1:
# Get XPath of the element found by the generated selector
try:
with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path:
js_code = js_path.read_text(encoding='utf-8')
logger.debug("xpathgenerator.js loaded successfully.")
except FileNotFoundError:
logger.error("xpathgenerator.js not found in the 'agents' package directory!")
raise
except Exception as e:
logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True)
raise
# Pass a *function string* to evaluate. Playwright passes the handle as 'element'.
# The function string first defines our helper, then calls it.
script_to_run = f"""
(element) => {{
{js_code} // Define the helper function(s)
return generateXPathForElement(element); // Call it
}}
"""
# Use page.evaluate, passing the script string and the element handle
matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0])
# Compare XPaths (simplest reliable comparison)
if target_node.xpath == matched_xpath:
validation_passed = True
logger.info(f"Validation PASSED: Selector '{final_selector}' uniquely matches target node.")
else:
logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').")
elif num_matches == 0:
logger.warning(f"Validation FAILED: Selector '{final_selector}' matched 0 elements.")
else: # num_matches > 1
logger.warning(f"Validation FAILED: Selector '{final_selector}' matched {num_matches} elements (not unique).")
# --- Fallback to XPath if validation failed ---
if not validation_passed:
logger.warning(f"Falling back to XPath selector for target node.")
original_selector = final_selector # Keep for logging/reasoning
final_selector = f"xpath={target_node.xpath}"
# Update reasoning if possible
if "reasoning" in verification_dict:
verification_dict["reasoning"] += f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]"
# Update the selector in the dictionary being built
verification_dict["verification_selector"] = final_selector
except Exception as validation_err:
logger.error(f"Error during selector validation ('{final_selector}'): {validation_err}. Falling back to XPath.")
original_selector = final_selector
final_selector = f"xpath={target_node.xpath}"
if "reasoning" in verification_dict:
verification_dict["reasoning"] += f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]"
verification_dict["verification_selector"] = final_selector
# --- Post-hoc validation (same as before, applied to final verification_dict) ---
if verification_dict.get("verified"):
assertion_type = verification_dict.get("assertion_type")
params = verification_dict.get("parameters", {})
needs_params = assertion_type in ['assert_text_equals', 'assert_text_contains', 'assert_attribute_equals', 'assert_element_count']
no_params_needed = assertion_type in ['assert_checked', 'assert_not_checked', 'assert_enabled', 'assert_disabled', 'assert_visible', 'assert_hidden', 'assert_llm_verification']
if not verification_dict.get("verification_selector"):
logger.error("Internal Error: Verification marked passed but final selector is missing!")
# verification_dict["verified"] = False
verification_dict["reasoning"] = "Verified to be true by using vision LLMs"
elif needs_params and not params:
logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs parameters, but none provided: {verification_dict}")
elif no_params_needed and params:
logger.warning(f"[LLM VERIFY WARN] Verified=true and assertion '{assertion_type}' typically needs no parameters, but some provided: {params}. Using empty params.")
verification_dict["parameters"] = {}
# Assign the potentially modified dictionary
verification_json = verification_dict
elif isinstance(response_obj, str): # Handle error string
logger.error(f"[LLM VERIFY FAILED] LLM returned an error string: {response_obj}")
self._add_to_history("LLM Verification Failed", {"raw_error_response": response_obj})
return None
else: # Handle unexpected type
logger.error(f"[LLM VERIFY FAILED] Unexpected response type from generate_json: {type(response_obj)}")
self._add_to_history("LLM Verification Failed", {"response_type": str(type(response_obj))})
return None
if verification_json:
logger.info(f"[LLM VERIFY RESULT] Verified: {verification_json['verified']}, Selector: {verification_json.get('verification_selector')}, Reasoning: {verification_json.get('reasoning', '')[:150]}...")
self._add_to_history("LLM Verification Result", verification_json)
return verification_json # Return the dictionary
else:
logger.error("[LLM VERIFY FAILED] Reached end without valid verification_json.")
return None
def _handle_llm_verification(self, planned_step: Dict[str, Any], verification_result: Dict[str, Any]) -> bool:
"""
Handles the user interaction and recording after LLM verification.
Now uses verification_selector as the primary target.
Returns True if handled (recorded/skipped), False if aborted.
"""
planned_desc = planned_step["description"]
is_verified_by_llm = verification_result['verified']
reasoning = verification_result.get('reasoning', 'N/A')
step_handled = False # Flag to track if a decision was made
parameter_name = None
if is_verified_by_llm:
final_selector = verification_result.get("verification_selector")
assertion_type = verification_result.get("assertion_type")
parameters = verification_result.get("parameters", {})
interactive_index = verification_result.get("element_index") # Optional index hint
static_id = verification_result.get("_static_id_used") # Optional static ID hint
# --- Perform Quick Validation using Playwright ---
validation_passed = False
validation_error = "Validation prerequisites not met (missing type/selector, unless assert_llm_verification)."
# Only validate if type and selector are present (or if type is assert_llm_verification)
if assertion_type and (final_selector or assertion_type == 'assert_llm_verification' or assertion_type == 'assert_passed_verification'):
logger.info("Performing quick Playwright validation of LLM's suggested assertion...")
validation_passed, validation_error = self.browser_controller.validate_assertion(
assertion_type, final_selector, parameters
)
elif not assertion_type:
validation_passed = True
assertion_type = "assert_llm_verification"
# NOTE THAT THIS IS CURRENTLY TREATED AS VERIFIED BY VISION LLM
# (selector check is handled inside validate_assertion)
if not validation_passed:
# --- Validation FAILED ---
logger.warning(f"LLM assertion validation FAILED for step '{planned_desc}': {validation_error}")
error_message_for_task = f"LLM assertion validation failed: {validation_error}. Original AI reasoning: {reasoning}"
# Mark task as failed so the main loop can handle retry/re-planning, passing the validation error
self.task_manager.update_subtask_status(
self.task_manager.current_subtask_index,
"failed",
error=error_message_for_task,
force_update=True # Ensure status changes
)
# No UI shown here, let main loop retry and pass error back to _get_llm_verification
return True # Indicate step was handled (by failing validation)
# --- Validation PASSED ---
logger.info("LLM assertion validation PASSED.")
# Proceed with highlighting, panel display (interactive), or direct recording (automated)
# --- Automated Mode (Validated Success) ---
if self.automated_mode:
logger.info(f"[Auto Mode] Handling Validated LLM Verification for: '{planned_desc}'")
logger.info(f"[Auto Mode] AI Result: PASSED. Validated Assertion: {assertion_type} on {final_selector}")
highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct
highlight_text = "Verify Target (Static)" if static_id else "Verify Target"
highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0)
self.browser_controller.clear_highlights()
try:
# Find node XPath if possible for better highlighting fallback
target_node_xpath = None
target_node = None
if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map:
target_node = self._last_static_id_map.get(static_id)
elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map:
target_node = self._latest_dom_state.selector_map.get(interactive_index)
if target_node: target_node_xpath = target_node.xpath
self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath)
except Exception as hl_err:
logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}")
print(f"AI suggests assertion on element: {final_selector} (Highlight failed)")
# Record the validated assertion automatically
logger.info(f"[Auto Mode] Recording validated assertion: {assertion_type} on {final_selector}")
record = {
"step_id": self._current_step_id,
"action": assertion_type,
"description": planned_desc,
"parameters": parameters,
"selector": final_selector,
"wait_after_secs": 0
}
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated - Automated)")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion (automated) as step {record['step_id']}")
step_handled = True
self.browser_controller.clear_highlights() # Clear highlights even in auto mode
return True # Indicate handled
# --- Interactive Mode (Validated Success) ---
else:
print("\n" + "="*60)
print(f"Planned Step: {planned_desc}")
print(f"Review AI Verification Result (Validated) in Panel...")
# Highlight element
final_selector = verification_result.get("verification_selector") # Re-fetch just in case
interactive_index = verification_result.get("element_index")
static_id = verification_result.get("_static_id_used")
highlight_color = "#008000" if static_id else "#0000FF" # Green for static, Blue for interactive/direct
highlight_text = "Verify Target (Static)" if static_id else "Verify Target"
highlight_idx_display = 0 if static_id else (interactive_index if interactive_index is not None else 0)
self.browser_controller.clear_highlights()
try:
target_node_xpath = None
target_node = None
if static_id and hasattr(self, '_last_static_id_map') and self._last_static_id_map:
target_node = self._last_static_id_map.get(static_id)
elif interactive_index is not None and self._latest_dom_state and self._latest_dom_state.selector_map:
target_node = self._latest_dom_state.selector_map.get(interactive_index)
if target_node: target_node_xpath = target_node.xpath
self.browser_controller.highlight_element(final_selector, highlight_idx_display, color=highlight_color, text=highlight_text, node_xpath=target_node_xpath)
print(f"AI suggests assertion on element (Validation PASSED): {final_selector}")
except Exception as hl_err:
logger.warning(f"Could not highlight verification target '{final_selector}': {hl_err}")
print(f"AI suggests assertion on element (Validation PASSED): {final_selector} (Highlight failed)")
print("="*60)
# Show the review panel (button should be enabled now)
self.panel.show_verification_review_panel(planned_desc, verification_result)
# Wait for user choice from panel
user_choice = self.panel.wait_for_panel_interaction(30.0) # Give more time for review
if not user_choice: user_choice = 'skip' # Default to skip on timeout
# --- Process User Choice ---
if user_choice == 'record_ai':
# Record validated assertion
final_selector = verification_result.get("verification_selector")
assertion_type = verification_result.get("assertion_type")
parameters = verification_result.get("parameters", {})
print(f"Recording validated AI assertion: {assertion_type} on {final_selector}")
record = { "step_id": self._current_step_id, "action": assertion_type, "description": planned_desc,
"parameters": parameters, "selector": final_selector, "wait_after_secs": 0 }
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded (AI Verified & Validated)")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded validated AI assertion as step {record['step_id']}")
step_handled = True
elif user_choice == 'define_manual':
print("Switching to manual assertion definition...")
self.panel.hide_recorder_panel() # Hide review panel
# Call the existing manual handler
if not self._handle_assertion_recording(planned_step):
self._user_abort_recording = True # Propagate abort signal
step_handled = True # Manual path handles the step
elif user_choice == 'skip':
print("Skipping verification step.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped AI verification via panel")
step_handled = True
elif user_choice == 'abort':
print("Aborting recording.")
self._user_abort_recording = True
step_handled = False # Abort signal
else: # Includes timeout, None, unexpected values
print("Invalid choice or timeout during verification review. Skipping step.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid choice/timeout on AI verification")
step_handled = True
# --- Cleanup UI ---
self.browser_controller.clear_highlights()
self.panel.hide_recorder_panel()
return step_handled and not self._user_abort_recording
else: # --- LLM verification FAILED initially ---
logger.warning(f"[Verification] AI verification FAILED for '{planned_desc}'. Reasoning: {reasoning}")
# Mode-dependent handling
if self.automated_mode:
logger.warning("[Auto Mode] Recording AI verification failure.")
failed_record = {
"step_id": self._current_step_id,
"action": "assert_failed_verification", # Specific action type
"description": planned_desc,
"parameters": {"reasoning": reasoning},
"selector": None, "wait_after_secs": 0
}
self.recorded_steps.append(failed_record)
self._current_step_id += 1
logger.info(f"Step {failed_record['step_id']} recorded (AI Verification FAILED - Automated)")
self.task_manager.update_subtask_status(
self.task_manager.current_subtask_index,
"failed", # Mark as skipped
result=f"AI verification failed: {reasoning}. Recorded as failed assertion.",
force_update=True
)
step_handled = True
else: # Interactive Mode - Fallback to Manual
print("\n" + "="*60)
print(f"Planned Step: {planned_desc}")
print(f"AI Verification Result: FAILED (Reason: {reasoning})")
print("Falling back to manual assertion definition...")
print("="*60)
self.browser_controller.clear_highlights()
self.panel.hide_recorder_panel() # Ensure review panel isn't shown
# Call the existing manual handler
if not self._handle_assertion_recording(planned_step):
self._user_abort_recording = True # Propagate abort
step_handled = True # Manual path handles the step
return step_handled and not self._user_abort_recording
def _trigger_re_planning(self, current_planned_task: Dict[str, Any], reason: str) -> bool:
"""
Attempts to get recovery steps from the LLM (using generate_json, potentially multimodal)
when an unexpected state is detected.
Returns True if recovery steps were inserted, False otherwise (or if abort requested).
"""
logger.warning(f"Triggering re-planning due to: {reason}")
self._add_to_history("Re-planning Triggered", {"reason": reason, "failed_step_desc": current_planned_task['description']})
if self.automated_mode:
print = lambda *args, **kwargs: logger.info(f"[Auto Mode Replanning] {' '.join(map(str, args))}") # Redirect print
input = lambda *args, **kwargs: 'y' # Default to accepting suggestions in auto mode
else:
print("\n" + "*"*60)
print("! Unexpected State Detected !")
print(f"Reason: {reason}")
print(f"Original Goal: {self.feature_description}")
print(f"Attempting Step: {current_planned_task['description']}")
print("Asking AI for recovery suggestions...")
print("*"*60)
# --- Gather Context for Re-planning ---
current_url = "Error getting URL"
dom_context_str = "Error getting DOM"
screenshot_bytes = None # Initialize
try:
current_url = self.browser_controller.get_current_url()
if self._latest_dom_state and self._latest_dom_state.element_tree:
dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
screenshot_bytes = self.browser_controller.take_screenshot()
except Exception as e:
logger.error(f"Error gathering context for re-planning: {e}")
original_plan_str = "\n".join([f"- {t['description']}" for t in self.task_manager.subtasks])
history_summary = self._get_history_summary()
last_done_step_desc = "N/A (Start of test)"
for i in range(current_planned_task['index'] -1, -1, -1):
if self.task_manager.subtasks[i]['status'] == 'done':
last_done_step_desc = self.task_manager.subtasks[i]['description']
break
# --- Construct Re-planning Prompt (Adjusted for generate_json with image) ---
prompt = f"""
You are an AI Test Recorder Assistant helping recover from an unexpected state during test recording.
**Overall Goal:** {self.feature_description}
**Original Planned Steps:**
{original_plan_str}
**Current Situation:**
- Last successfully completed planned step: '{last_done_step_desc}'
- Currently trying to execute planned step: '{current_planned_task['description']}' (Attempt {current_planned_task['attempts']})
- Encountered unexpected state/error: {reason}
- Current URL: {current_url}
**Current Page Context (Interactive Elements with Indices. You can interact with non visible elements as well):**
```html
{dom_context_str}
```
{f"**Screenshot Analysis:** Please analyze the attached screenshot to understand the current visual state and identify elements relevant for recovery." if screenshot_bytes else "**Note:** No screenshot provided for visual analysis."}
**Your Task:**
Analyze the current situation, context (DOM, URL, screenshot if provided), and the overall goal. If you are not clear and think scrolling might help, you can scroll to see the complete page.
Generate a JSON object matching the required schema.
- **If recovery is possible:** Provide a **short sequence (1-3 steps)** of recovery actions in the `recovery_steps` field (list of strings). These steps should aim to get back on track towards the original goal OR correctly perform the intended action of the failed step ('{current_planned_task['description']}') in the *current* context. Focus ONLY on the immediate recovery. Example: `["Click element 'Close Popup Button'", "Verify 'Main Page Title' is visible"]`. `action` field should be null.
- **If recovery seems impossible, too complex, or unsafe:** Set the `action` field to `"abort"` and provide a brief explanation in the `reasoning` field. `recovery_steps` should be null. Example: `{{"action": "abort", "reasoning": "Critical error page displayed, cannot identify recovery elements."}}`
**JSON Output Structure Examples:**
*Recovery Possible:*
```json
{{
"recovery_steps": ["Click element 'Accept Cookies Button'", "Verify 'Main Page Title' is visible"],
"action": null,
"reasoning": null
}}
```
*Recovery Impossible:*
```json
{{
"recovery_steps": null,
"action": "abort",
"reasoning": "The application crashed, unable to proceed."
}}
```
Respond ONLY with the JSON object matching the schema.
"""
# --- Call LLM using generate_json, passing image_bytes ---
response_obj = None
error_msg = None
try:
logger.debug("[LLM REPLAN] Sending prompt (and potentially image) to generate_json...")
response_obj = self.llm_client.generate_json(
ReplanSchema,
prompt,
image_bytes=screenshot_bytes # Pass image here
)
logger.debug(f"[LLM REPLAN] Raw response object type: {type(response_obj)}")
except Exception as e:
logger.error(f"LLM call failed during re-planning: {e}", exc_info=True)
print("Error: Could not communicate with LLM for re-planning.")
error_msg = f"LLM communication error: {e}"
# --- Parse Response ---
recovery_steps = None
abort_action = False
abort_reasoning = "No reason provided."
if isinstance(response_obj, ReplanSchema):
logger.debug(f"[LLM REPLAN] Successfully parsed response: {response_obj}")
if response_obj.recovery_steps and isinstance(response_obj.recovery_steps, list):
if all(isinstance(s, str) and s for s in response_obj.recovery_steps):
recovery_steps = response_obj.recovery_steps
else:
logger.warning(f"[LLM REPLAN] Parsed recovery_steps list contains invalid items: {response_obj.recovery_steps}")
error_msg = "LLM provided invalid recovery steps."
elif response_obj.action == "abort":
abort_action = True
abort_reasoning = response_obj.reasoning or "No specific reason provided by AI."
logger.warning(f"AI recommended aborting recording. Reason: {abort_reasoning}")
else:
logger.warning("[LLM REPLAN] LLM response did not contain valid recovery steps or an abort action.")
error_msg = "LLM response was valid JSON but lacked recovery_steps or abort action."
elif isinstance(response_obj, str): # Handle error string from generate_json
logger.error(f"[LLM REPLAN] Failed to generate/parse recovery JSON: {response_obj}")
error_msg = f"LLM generation/parsing error: {response_obj}"
elif response_obj is None and error_msg: # Handle communication error from above
pass # error_msg already set
else: # Handle unexpected return type
logger.error(f"[LLM REPLAN] Unexpected response type from generate_json: {type(response_obj)}")
error_msg = f"Unexpected response type: {type(response_obj)}"
# --- Handle Outcome (Mode-dependent) ---
if abort_action:
print(f"\nAI Suggests Aborting: {abort_reasoning}")
if self.automated_mode:
print("[Auto Mode] Accepting AI abort suggestion.")
abort_choice = 'a'
else:
abort_choice = input("AI suggests aborting. Abort (A) or Ignore and Skip Failed Step (S)? > ").strip().lower()
if abort_choice == 'a':
self._user_abort_recording = True # Mark for abort
self._abort_reason = abort_reasoning
self.task_manager.update_subtask_status(current_planned_task['index'], "failed", error=f"Aborted based on AI re-planning suggestion: {abort_reasoning}", force_update=True)
return False # Abort
else: # Skipped (Interactive only)
logger.info("User chose to ignore AI abort suggestion and skip the failed step.")
self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped after AI suggested abort", force_update=True)
return False # Didn't insert steps
elif recovery_steps:
print("\nAI Suggested Recovery Steps:")
for i, step in enumerate(recovery_steps): print(f" {i+1}. {step}")
if self.automated_mode:
print("[Auto Mode] Accepting AI recovery steps.")
confirm_recovery = 'y'
else:
confirm_recovery = input("Attempt these recovery steps? (Y/N/Abort) > ").strip().lower()
if confirm_recovery == 'y':
logger.info(f"Attempting AI recovery steps: {recovery_steps}")
if self._insert_recovery_steps(current_planned_task['index'] + 1, recovery_steps):
# Record that the original step's intent is being handled by a re-plan
self.recorded_steps.append({
"step_id": self._current_step_id,
"action": "task_replanned", # Changed action name
"description": current_planned_task['description'], # Use original task description
"parameters": {
"reason_for_replan": reason,
"recovery_steps_planned": recovery_steps,
"original_task_index_in_plan": current_planned_task['index'] # For traceability
},
"selector": None,
"wait_after_secs": 0
})
self._current_step_id += 1
# Mark the original task as 'done' in TaskManager as its outcome is now via these recovery steps
self.task_manager.update_subtask_status(
current_planned_task['index'],
"done",
result=f"Step handled by re-planning. Details recorded as 'task_replanned'. Recovery steps: {recovery_steps}",
force_update=True # Ensure status is updated
)
self._consecutive_suggestion_failures = 0
return True # Indicate recovery steps were inserted
else: # Insertion failed (should be rare)
print("Error: Failed to insert recovery steps. Skipping original failed step.")
self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (failed to insert AI recovery steps)", force_update=True)
return False
elif confirm_recovery == 'a': # Interactive only
self._user_abort_recording = True
return False # Abort
else: # N or invalid (Interactive or failed auto-acceptance)
print("Skipping recovery attempt and the original failed step.")
logger.info("User declined/skipped AI recovery steps. Skipping original failed step.")
self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result="Skipped (User/Auto declined AI recovery)", force_update=True)
return False # Skipped
else: # LLM failed to provide valid steps or abort
print(f"\nAI failed to provide valid recovery steps or an abort action. Reason: {error_msg or 'Unknown LLM issue'}")
if self.automated_mode:
print("[Auto Mode] Skipping failed step due to LLM re-planning failure.")
skip_choice = 's'
else:
skip_choice = input("Skip the current failed step (S) or Abort recording (A)? > ").strip().lower()
if skip_choice == 'a': # Interactive only possibility
self._user_abort_recording = True
return False # Abort
else: # Skip (default for auto mode, or user choice)
print("Skipping the original failed step.")
logger.warning(f"LLM failed re-planning ({error_msg}). Skipping original failed step.")
self.task_manager.update_subtask_status(current_planned_task['index'], "skipped", result=f"Skipped (AI re-planning failed: {error_msg})", force_update=True)
return False # Skipped
def _insert_recovery_steps(self, index: int, recovery_steps: List[str]) -> bool:
"""Calls TaskManager to insert steps."""
return self.task_manager.insert_subtasks(index, recovery_steps)
def _determine_action_and_selector_for_recording(self,
current_task: Dict[str, Any],
current_url: str,
dom_context_str: str # Now contains indexed elements with PRE-GENERATED selectors
) -> Optional[Dict[str, Any]]: # Keep return type as Dict for downstream compatibility
"""
Uses LLM (generate_json) to propose the browser action (click, type) and identify the target *element index*
based on the planned step description and the DOM context. The robust selector is retrieved
from the DOM state afterwards. Returns a dictionary representation or None on error.
"""
logger.info(f"Determining AI suggestion for planned step: '{current_task['description']}'")
prompt = f"""
You are an AI assistant helping a user record a web test. Your goal is to interpret the user's planned step and identify the **single target interactive element** in the provided context that corresponds to it, then suggest the appropriate action.
**Feature Under Test:** {self.feature_description}
**Current Planned Step:** {current_task['description']}
**Current URL:** {current_url}
**Test Recording Progress:** Attempt {current_task['attempts']} of {self.task_manager.max_retries_per_subtask + 1} for this suggestion.
**Input Context (Visible Interactive Elements with Indices):**
This section shows interactive elements on the page, each marked with `[index]` and its pre-generated robust CSS selector. You can also interact with the non visible elements
```html
{dom_context_str}
```
**Your Task:**
Based ONLY on the "Current Planned Step" description and the "Input Context":
1. Determine the appropriate **action** (`click`, `type`, `select`, `check`, `uncheck`, `key_press`, `drag_and_drop`, `action_not_applicable`, `suggestion_failed`).
2. If action is `click`, `type`, `select`, `check`, `uncheck`, or `key_press`:
* Identify the **single most likely interactive element `[index]`** from the context that matches the description. Set `parameters.index`.
3. If action is `type`: Extract the **text** to be typed. Set `parameters.text`.
4. If action is `select`: Identify the main `<select>` element index and extract the target option's visible label into `parameters.option_label`.
5. If action is `key_press`: Identify the target element `[index]` and extract the key(s) to press. Set `parameters.keys`.
6. If action is `drag_and_drop`: Identify the source element `[index]` and the target element `[target_index]`.
7. Provide brief **reasoning** linking the step description to the chosen index/action/parameters.
**Output JSON Structure Examples:**
*Click Action:*
```json
{{
"action": "click",
"parameters": {{"index": 12}},
"reasoning": "The step asks to click the 'Login' button, which corresponds to element [12]."
}}
```
*Type Action:*
```json
{{
"action": "type",
"parameters": {{"index": 5, "text": "user@example.com"}},
"reasoning": "The step asks to type 'user@example.com' into the email field, which is element [5]."
}}
```
*Check Action:*
```json
{{
"action": "check",
"parameters": {{"index": 8}},
"reasoning": "Step asks to check the 'Agree' checkbox [8]."
}}
```
*Uncheck Action:*
```json
{{
"action": "uncheck",
"parameters": {{"index": 9}},
"reasoning": "Step asks to uncheck 'Subscribe' [9]."
}}
*Key Press:*
```json
{{
"action": "key_press",
"parameters": {{"index": 3, "keys": "Enter"}},
"reasoning": "The step asks to press Enter on the search input [3]."
}}
```
*Drag and Drop:*
```json
{{
"action": "drag_and_drop",
"parameters": {{"index": 10, "destination_index": 15}},
"reasoning": "The step asks to drag the item [10] to the cart area [15]."
}}
```
```json
{{
"action": "select",
"parameters": {{"index": 12, "option_label": "Weekly"}},
"reasoning": "The step asks to select 'Weekly' in the 'Notification Frequency' dropdown [12]."
}}
```
```
*Not Applicable (Navigation/Verification):*
```json
{{
"action": "action_not_applicable",
"parameters": {{}},
"reasoning": "The step 'Navigate to ...' does not involve clicking or typing on an element from the context."
}}
```
*Suggestion Failed (Cannot identify element):*
```json
{{
"action": "suggestion_failed",
"parameters": {{}},
"reasoning": "Could not find a unique element matching 'the second confirmation button'."
}}
```
**CRITICAL INSTRUCTIONS:**
- Focus on the `[index]` and Do NOT output selectors for `click`/`type` actions.
- For `select` action, identify the main `<select>` element index and extract the target option's label into `parameters.option_label`.
- For `key_press`, provide the target `index` and the `keys` string.
- For `drag_and_drop`, provide the source `index` and the `target_index`
- Use `action_not_applicable` for navigation, verification, scroll, wait steps.
- Be precise with extracted `text` for the `type` action.
Respond ONLY with the JSON object matching the schema.
"""
# --- End Prompt ---
# Add error context if retrying suggestion
if current_task['status'] == 'in_progress' and current_task['attempts'] > 1 and current_task.get('error'):
error_context = str(current_task['error'])[:300] + "..."
prompt += f"\n**Previous Suggestion Attempt Error:**\nAttempt {current_task['attempts'] - 1} failed: {error_context}\nRe-evaluate the description and context carefully.\n"
# Add history summary for general context
prompt += f"\n**Recent History (Context):**\n{self._get_history_summary()}\n"
logger.debug(f"[LLM RECORDER PROMPT] Sending prompt snippet for action/index suggestion:\n{prompt[:500]}...")
response_obj = self.llm_client.generate_json(RecorderSuggestionSchema, prompt)
suggestion_dict = None # Initialize
suggestion_failed = False
failure_reason = "LLM suggestion generation failed."
if isinstance(response_obj, RecorderSuggestionSchema):
logger.debug(f"[LLM RECORDER RESPONSE] Parsed suggestion: {response_obj}")
# Convert to dict for downstream use (or refactor downstream to use object)
suggestion_dict = response_obj.model_dump(exclude_none=True)
action = suggestion_dict.get("action")
reasoning = suggestion_dict.get("reasoning", "No reasoning provided.")
logger.info(f"[LLM Suggestion] Action: {action}, Params: {suggestion_dict.get('parameters')}, Reasoning: {reasoning[:150]}...")
self._add_to_history("LLM Suggestion", suggestion_dict)
# --- Basic Validation (Schema handles enum/types) ---
required_index_actions = ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"]
if action in required_index_actions:
target_index = suggestion_dict.get("parameters", {}).get("index")
if target_index is None: # Index is required for these actions
logger.error(f"LLM suggested action '{action}' but missing required index.")
suggestion_failed = True
failure_reason = f"LLM suggestion '{action}' missing required parameter 'index'."
elif action == "key_press" and suggestion_dict.get("parameters", {}).get("keys") is None:
logger.error("LLM suggested action 'key_press' but missing required keys.")
suggestion_failed = True
failure_reason = "LLM suggestion 'key_press' missing required parameter 'keys'."
elif action == "drag_and_drop" and suggestion_dict.get("parameters", {}).get("target_index") is None:
logger.error("LLM suggested action 'drag_and_drop' but missing required target_index.")
suggestion_failed = True
failure_reason = "LLM suggestion 'drag_and_drop' missing required parameter 'target_index'."
elif action == "type" and suggestion_dict.get("parameters", {}).get("text") is None:
logger.error(f"LLM suggested action 'type' but missing required text.")
suggestion_failed = True
failure_reason = f"LLM suggestion 'type' missing required parameter 'text'."
elif action == "suggestion_failed":
suggestion_failed = True
failure_reason = suggestion_dict.get("reasoning", "LLM indicated suggestion failed.")
elif action == "action_not_applicable":
pass # This is a valid outcome, handled below
else: # Should not happen if schema enum is enforced
logger.error(f"LLM returned unexpected action type: {action}")
suggestion_failed = True
failure_reason = f"LLM returned unknown action '{action}'."
elif isinstance(response_obj, str): # Handle error string
logger.error(f"[LLM Suggestion Failed] LLM returned an error string: {response_obj}")
self._add_to_history("LLM Suggestion Failed", {"raw_error_response": response_obj})
suggestion_failed = True
failure_reason = f"LLM error: {response_obj}"
else: # Handle unexpected type
logger.error(f"[LLM Suggestion Failed] Unexpected response type from generate_json: {type(response_obj)}")
self._add_to_history("LLM Suggestion Failed", {"response_type": str(type(response_obj))})
suggestion_failed = True
failure_reason = f"Unexpected response type: {type(response_obj)}"
# --- Process Suggestion ---
if suggestion_failed:
# Return a standardized failure dictionary
return {"action": "suggestion_failed", "parameters": {}, "reasoning": failure_reason}
# Handle successful suggestions (click, type, not_applicable)
if suggestion_dict["action"] in required_index_actions:
target_index = suggestion_dict["parameters"]["index"] # We validated index exists above
# --- Retrieve the node and pre-generated selector ---
if self._latest_dom_state is None or not self._latest_dom_state.selector_map:
logger.error("DOM state or selector map is missing, cannot lookup suggested index.")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal error: DOM state unavailable."}
target_node = self._latest_dom_state.selector_map.get(target_index)
if target_node is None:
available_indices = list(self._latest_dom_state.selector_map.keys())
logger.error(f"LLM suggested index [{target_index}], but it was not found in DOM context map. Available: {available_indices}")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{target_index}] not found in current page context."}
suggested_selector = target_node.css_selector
if not suggested_selector:
# Try to generate it now if missing
suggested_selector = self.browser_controller.get_selector_for_node(target_node)
if suggested_selector:
target_node.css_selector = suggested_selector # Cache it
else:
logger.error(f"Could not generate selector for suggested index [{target_index}] (Node: {target_node.tag_name}).")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{target_index}]."}
if suggested_selector and target_node and self.browser_controller.page:
try:
handles = self.browser_controller.page.query_selector_all(suggested_selector)
num_matches = len(handles)
validation_passed = False
if num_matches == 1:
# Get XPath of the element found by the generated selector
# Note: Requires a reliable JS function 'generateXPath' accessible in evaluate
try:
with resources.files(__package__).joinpath('js_utils', 'xpathgenerator.js') as js_path:
js_code = js_path.read_text(encoding='utf-8')
logger.debug("xpathgenerator.js loaded successfully.")
except FileNotFoundError:
logger.error("xpathgenerator.js not found in the 'agents' package directory!")
raise
except Exception as e:
logger.error(f"Error loading xpathgenerator.js: {e}", exc_info=True)
raise
# Pass a *function string* to evaluate. Playwright passes the handle as 'element'.
# The function string first defines our helper, then calls it.
script_to_run = f"""
(element) => {{
{js_code} // Define the helper function(s)
return generateXPathForElement(element); // Call it
}}
"""
# Use page.evaluate, passing the script string and the element handle
matched_xpath = self.browser_controller.page.evaluate(script_to_run, handles[0])
# Compare XPaths
if target_node.xpath == matched_xpath:
validation_passed = True
logger.info(f"Validation PASSED: Suggested selector '{suggested_selector}' uniquely matches target node [{target_index}].")
else:
logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 1 element, but its XPath ('{matched_xpath}') differs from target node XPath ('{target_node.xpath}').")
elif num_matches == 0:
logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched 0 elements.")
else: # num_matches > 1
logger.warning(f"Validation FAILED: Suggested selector '{suggested_selector}' matched {num_matches} elements (not unique).")
# --- Fallback to XPath if validation failed ---
if not validation_passed:
logger.warning(f"Falling back to XPath selector for target node [{target_index}].")
original_selector = suggested_selector
suggested_selector = f"xpath={target_node.xpath}"
# Update the suggestion dictionary
suggestion_dict["suggested_selector"] = suggested_selector
suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: CSS selector ('{original_selector}') failed validation, using XPath fallback.]"
except Exception as validation_err:
logger.error(f"Error during selector validation ('{suggested_selector}'): {validation_err}. Falling back to XPath.")
original_selector = suggested_selector
suggested_selector = f"xpath={target_node.xpath}"
# Update the suggestion dictionary
suggestion_dict["suggested_selector"] = suggested_selector
suggestion_dict["reasoning"] = suggestion_dict.get("reasoning", "") + f" [Note: Error validating CSS selector ('{original_selector}'), using XPath fallback.]"
logger.info(f"LLM suggested index [{target_index}], resolved to selector: '{suggested_selector}'")
# Add resolved selector and node to the dictionary returned
suggestion_dict["suggested_selector"] = suggested_selector
suggestion_dict["target_node"] = target_node
if suggestion_dict["action"] == "drag_and_drop":
destination_index = suggestion_dict["parameters"].get("destination_index")
if destination_index is not None:
destination_node = self._latest_dom_state.selector_map.get(destination_index)
if destination_node is None:
available_indices = list(self._latest_dom_state.selector_map.keys())
logger.error(f"LLM suggested index [{destination_index}], but it was not found in DOM context map. Available: {available_indices}")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Suggested element index [{destination_index}] not found in current page context."}
destination_selector = destination_node.css_selector
if not destination_selector:
# Try to generate it now if missing
destination_selector = self.browser_controller.get_selector_for_node(destination_node)
if destination_selector:
destination_node.css_selector = destination_selector # Cache it
else:
logger.error(f"Could not generate selector for suggested index [{destination_index}] (Node: {destination_node.tag_name}).")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": f"Failed to generate CSS selector for suggested index [{destination_index}]."}
suggestion_dict["destination_selector"] = destination_selector
suggestion_dict["destination_node"] = destination_node
logger.info(f"LLM suggested drag target index [{destination_index}], resolved to selector: '{destination_selector}'")
else: # Should have been caught by validation, but double-check
logger.error("LLM suggested drag_and_drop without destination_index.")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Drag and drop suggestion missing destination index."}
return suggestion_dict
elif suggestion_dict["action"] == "action_not_applicable":
# Pass this through directly
return suggestion_dict
else: # Should be unreachable given the checks above
logger.error("Reached unexpected point in suggestion processing.")
return {"action": "suggestion_failed", "parameters": {}, "reasoning": "Internal processing error after LLM response."}
def _execute_action_for_recording(self, action: str, selector: Optional[str], parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Executes a specific browser action (navigate, click, type) during recording.
This is called *after* user confirmation/override. It does not involve AI decision.
"""
result = {"success": False, "message": f"Action '{action}' invalid.", "data": None}
if not action:
result["message"] = "No action specified for execution."
logger.warning(f"[RECORDER_EXEC] {result['message']}")
return result
logger.info(f"[RECORDER_EXEC] Executing: {action} | Selector: {selector} | Params: {parameters}")
self._add_to_history("Executing Recorder Action", {"action": action, "selector": selector, "parameters": parameters})
try:
if action == "navigate":
url = parameters.get("url")
if not url or not isinstance(url, str): raise ValueError("Missing or invalid 'url'.")
self.browser_controller.goto(url)
result["success"] = True
result["message"] = f"Navigated to {url}."
# Add implicit wait for load state after navigation
self.recorded_steps.append({
"step_id": self._current_step_id, # Use internal counter
"action": "wait_for_load_state",
"description": "Wait for page navigation to complete",
"parameters": {"state": "domcontentloaded"}, # Reasonable default
"selector": None,
"wait_after_secs": 0
})
self._current_step_id += 1 # Increment after adding implicit step
elif action == "click":
if not selector: raise ValueError("Missing selector for click action.")
self.browser_controller.click(selector)
time.sleep(0.5)
result["success"] = True
result["message"] = f"Clicked element: {selector}."
elif action == "type":
text = parameters.get("text")
if not selector: raise ValueError("Missing selector for type action.")
if text is None: raise ValueError("Missing or invalid 'text'.") # Allow empty string? yes.
self.browser_controller.type(selector, text)
result["success"] = True
result["message"] = f"Typed into element: {selector}."
elif action == "scroll": # Basic scroll support if planned
direction = parameters.get("direction")
if direction not in ["up", "down"]: raise ValueError("Invalid scroll direction.")
self.browser_controller.scroll(direction)
result["success"] = True
result["message"] = f"Scrolled {direction}."
elif action == "check":
if not selector: raise ValueError("Missing selector for check action.")
self.browser_controller.check(selector)
result["success"] = True
result["message"] = f"Checked element: {selector}."
elif action == "uncheck":
if not selector: raise ValueError("Missing selector for uncheck action.")
self.browser_controller.uncheck(selector)
result["success"] = True
result["message"] = f"Unchecked element: {selector}."
elif action == "select":
option_label = parameters.get("option_label")
# option_value = parameters.get("option_value") # If supporting value selection
if not selector: raise ValueError("Missing selector for select action.")
if not option_label: # and not option_value: # Prioritize label
raise ValueError("Missing 'option_label' parameter for select action.")
logger.info(f"Selecting option by label '{option_label}' in element: {selector}")
# Use the main browser_controller page reference
locator = self.browser_controller._get_locator(selector) # Use helper to get locator
# select_option can take label, value, or index
locator.select_option(label=option_label, timeout=self.browser_controller.default_action_timeout)
result["success"] = True
result["message"] = f"Selected option '{option_label}' in element: {selector}."
elif action == "key_press":
keys = parameters.get("keys")
if not selector: raise ValueError("Missing selector for key_press action.")
if not keys: raise ValueError("Missing 'keys' parameter for key_press action.")
self.browser_controller.press(selector, keys)
result["success"] = True
result["message"] = f"Pressed '{keys}' on element: {selector}."
elif action == "drag_and_drop":
destination_selector = parameters.get("destination_selector")
if not selector: raise ValueError("Missing source selector for drag_and_drop.")
if not destination_selector: raise ValueError("Missing 'destination_selector' parameter for drag_and_drop.")
self.browser_controller.drag_and_drop(selector, destination_selector)
result["success"] = True
result["message"] = f"Dragged '{selector}' to '{destination_selector}'."
# NOTE: "wait" actions are generally not *executed* during recording,
# they are just recorded based on the plan.
else:
result["message"] = f"Action '{action}' is not directly executable during recording via this method."
logger.warning(f"[RECORDER_EXEC] {result['message']}")
except (PlaywrightError, PlaywrightTimeoutError, ValueError) as e:
error_msg = f"Execution during recording failed for action '{action}' on selector '{selector}': {type(e).__name__}: {e}"
logger.error(f"[RECORDER_EXEC] {error_msg}", exc_info=False)
result["message"] = error_msg
result["success"] = False
# Optionally save screenshot on execution failure *during recording*
try:
ts = time.strftime("%Y%m%d_%H%M%S")
fname = f"output/recorder_exec_fail_{action}_{ts}.png"
self.browser_controller.save_screenshot(fname)
logger.info(f"Saved screenshot on recorder execution failure: {fname}")
except: pass # Ignore screenshot errors here
except Exception as e:
error_msg = f"Unexpected Error during recorder execution action '{action}': {type(e).__name__}: {e}"
logger.critical(f"[RECORDER_EXEC] {error_msg}", exc_info=True)
result["message"] = error_msg
result["success"] = False
# Log Action Result
log_level = logging.INFO if result["success"] else logging.WARNING
logger.log(log_level, f"[RECORDER_EXEC_RESULT] Action '{action}' | Success: {result['success']} | Message: {result['message']}")
self._add_to_history("Recorder Action Result", {"success": result["success"], "message": result["message"]})
return result
# --- New Recorder Core Logic ---
def _handle_interactive_step_recording(self, planned_step: Dict[str, Any], suggestion: Dict[str, Any]) -> bool:
"""
Handles the user interaction loop for a suggested 'click' or 'type' action.
Returns True if the step was successfully recorded (or skipped), False if aborted.
"""
action = suggestion["action"]
suggested_selector = suggestion["suggested_selector"] # source selector
target_node = suggestion["target_node"] # DOMElementNode # source node
destination_selector = suggestion.get("destination_selector")
destination_node = suggestion.get("destination_node")
parameters = suggestion["parameters"] # Contains index and potentially text
reasoning = suggestion.get("reasoning", "N/A")
planned_desc = planned_step["description"]
final_selector = None
performed_action = False
user_choice = None
parameter_name = None
action_recorded = False # Flag to track if we actually recorded the step
# --- Automated Mode ---
if self.automated_mode:
logger.info(f"[Auto Mode] Handling AI suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})")
logger.info(f"[Auto Mode] Suggested Selector: {suggested_selector}")
if action == "drag_and_drop":
logger.info(f"[Auto Mode] Suggested Target Selector: {destination_selector}")
self.browser_controller.clear_highlights()
self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion")
if action == "drag_and_drop" and destination_selector and destination_node:
self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Target)")
# Directly accept AI suggestion
final_selector = suggested_selector
final_destination_selector = destination_selector
logger.info(f"[Auto Mode] Automatically accepting AI suggestion.")
exec_params = parameters.copy() # Start with base params
if action == "drag_and_drop":
exec_params["destination_selector"] = final_destination_selector # Add target for execution
# Execute action on AI's suggested selector
exec_result = self._execute_action_for_recording(action, final_selector, exec_params)
performed_action = exec_result["success"]
if performed_action:
# Record the successful step automatically
record = {
"step_id": self._current_step_id, "action": action, "description": planned_desc,
"parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
}
if action == "type":
# Include text, but no parameterization prompt
record["parameters"]["text"] = parameters.get("text", "")
elif action == "select":
# Ensure 'option_label' from the suggestion's parameters is added
if "option_label" in parameters:
record["parameters"]["option_label"] = parameters["option_label"]
if "option_value" in parameters: record["parameters"]["option_value"] = parameters["option_value"]
if "option_index" in parameters: record["parameters"]["option_index"] = parameters["option_index"]
elif action == "key_press":
record["parameters"]["keys"] = parameters.get("keys", "")
elif action == "drag_and_drop":
record["parameters"]["destination_selector"] = final_destination_selector
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded (AI Suggestion - Automated): {action} on {final_selector}")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion (automated) as step {record['step_id']}")
return True # Success
else: # AI suggestion execution failed
logger.error(f"[Auto Mode] Execution FAILED using AI suggested selector: {exec_result['message']}")
# Mark as failed for potential re-planning or retry in the main loop
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Automated execution failed: {exec_result['message']}")
self.browser_controller.clear_highlights()
self.panel.hide_recorder_panel()
# Do not abort automatically, let the main loop handle failure/retry logic
return True # Indicate handled (failure noted), loop continues
# --- Interactive Mode ---
else:
print("\n" + "="*60)
print(f"Planned Step: {planned_desc}")
print(f"AI Suggestion: Action='{action}', Target='{target_node.tag_name}' (Reason: {reasoning})")
print(f"Suggested Selector: {suggested_selector}")
if action == "drag_and_drop":
print(f"Suggested Selector (Destination): {destination_selector}")
print("="*60)
# Highlight suggested element
self.browser_controller.clear_highlights()
self.browser_controller.highlight_element(suggested_selector, target_node.highlight_index, color="#FFA500", text="AI Suggestion") # Orange for suggestion
if action == "drag_and_drop" and destination_selector and destination_node:
self.browser_controller.highlight_element(destination_selector, destination_node.highlight_index, color="#0000FF", text="AI Suggestion (Destination)")
# Show the UI Panel with options
suggestion_display_text = f"'{action}' on <{target_node.tag_name}>"
if action == "type": suggestion_display_text += f" with text '{parameters.get('text', '')[:20]}...'"
elif action == "key_press": suggestion_display_text += f" with key(s) '{parameters.get('keys', '')}'"
elif action == "drag_and_drop": suggestion_display_text += f" to <{destination_node.tag_name if destination_node else 'N/A'}>"
self.panel.show_recorder_panel(planned_desc, suggestion_display_text)
# Setup listener *after* showing panel, *before* waiting
listener_setup = self.browser_controller.setup_click_listener()
if not listener_setup:
logger.error("Failed to set up click listener, cannot proceed with override.")
self.panel.hide_recorder_panel()
return False # Abort
# --- Wait for User Interaction (Click Override OR Panel Button) ---
# Total time budget for interaction
TOTAL_INTERACTION_TIMEOUT = 20.0 # e.g., 20 seconds total
PANEL_WAIT_TIMEOUT = 15.0 # Time to wait for panel *after* click timeout
override_selector = None
try:
logger.debug("Waiting for user click override...")
# Wait for click first (short timeout)
click_wait_time = TOTAL_INTERACTION_TIMEOUT - PANEL_WAIT_TIMEOUT
override_selector = self.browser_controller.wait_for_user_click_or_timeout(click_wait_time)
if override_selector:
print(f"\n[Recorder] User override detected! Using selector: {override_selector}")
user_choice = 'override' # Special internal choice
else:
# Click timed out, now wait for panel interaction
logger.debug("No click override. Waiting for panel interaction...")
user_choice = self.panel.wait_for_panel_interaction(PANEL_WAIT_TIMEOUT)
if user_choice:
print(f"\n[Recorder] User choice via panel: {user_choice}")
else:
print("\n[Recorder] Timeout waiting for panel interaction. Skipping step.")
user_choice = 'skip' # Default to skip on timeout
except Exception as e:
logger.error(f"Error during user interaction wait: {e}", exc_info=True)
user_choice = 'abort' # Abort on unexpected error
# --- Process User Choice ---
if user_choice == 'override':
final_selector = override_selector
final_destination_selector = destination_selector if action == "drag_and_drop" else None
performed_action = False
print(f"Executing original action '{action}' on overridden selector...")
exec_params = parameters.copy()
if action == "drag_and_drop":
exec_params["destination_selector"] = final_destination_selector
exec_result = self._execute_action_for_recording(action, final_selector, parameters)
performed_action = exec_result["success"]
if performed_action:
# --- Ask for Parameterization (for 'type' action) ---
if action == "type":
param_text = parameters.get("text", "")
if self.panel.prompt_parameterization_in_panel(param_text):
print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...")
param_choice = self.panel.wait_for_panel_interaction(15.0) # Wait for param submit
if param_choice == 'parameterized':
parameter_name = self.panel.get_parameterization_result()
print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.")
else:
print("Parameterization skipped or timed out.")
else:
print("Could not show parameterization UI.")
# --- Record the override step ---
record = { "step_id": self._current_step_id, "action": action, "description": planned_desc,
"parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION }
if action == "type": record["parameters"]["text"] = parameters.get("text", "")
elif action == "select":
# Assume original parameters (like option_label) still apply for override
if "option_label" in parameters:
record["parameters"]["option_label"] = parameters["option_label"]
elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "")
elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector
if parameter_name: record["parameters"]["parameter_name"] = parameter_name
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded (User Override): {action} on {final_selector}")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded override as step {record['step_id']}")
action_recorded = True
else:
# Override execution failed
print(f"WARNING: Execution failed using override selector: {exec_result['message']}")
# Ask to skip or abort via panel again? Simpler to just skip here.
print("Skipping step after failed override execution.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped after failed override execution")
elif user_choice == 'accept':
print("Accepting AI suggestion.")
final_selector = suggested_selector
final_destination_selector = destination_selector if action == "drag_and_drop" else None
performed_action = False
exec_params = parameters.copy()
if action == "drag_and_drop":
exec_params["destination_selector"] = final_destination_selector
exec_result = self._execute_action_for_recording(action, final_selector, parameters)
performed_action = exec_result["success"]
if performed_action:
# --- Ask for Parameterization ---
if action == "type":
param_text = parameters.get("text", "")
if self.panel.prompt_parameterization_in_panel(param_text):
print(f"Parameterize '{param_text[:30]}...'? Enter name in panel or leave blank...")
param_choice = self.panel.wait_for_panel_interaction(15.0)
if param_choice == 'parameterized':
parameter_name = self.panel.get_parameterization_result()
print(f"Parameter name set to: '{parameter_name}'" if parameter_name else "No parameter name entered.")
else:
print("Parameterization skipped or timed out.")
else:
print("Could not show parameterization UI.")
# --- Record the accepted AI suggestion ---
record = { "step_id": self._current_step_id, "action": action, "description": planned_desc,
"parameters": {}, "selector": final_selector, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION }
if action == "type": record["parameters"]["text"] = parameters.get("text", "")
elif action == "select":
if "option_label" in parameters:
record["parameters"]["option_label"] = parameters["option_label"]
elif action == "key_press": record["parameters"]["keys"] = parameters.get("keys", "")
elif action == "drag_and_drop": record["parameters"]["destination_selector"] = final_destination_selector
if parameter_name: record["parameters"]["parameter_name"] = parameter_name
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded (AI Suggestion): {action} on {final_selector}")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded AI suggestion as step {record['step_id']}")
action_recorded = True
else:
# AI suggestion execution failed
print(f"WARNING: Execution failed using AI suggested selector: {exec_result['message']}")
# Ask to retry/skip/abort via panel again? Or mark as failed for main loop retry?
# Let's mark as failed for retry by the main loop.
print("Marking step for retry after execution failure...")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "failed", error=f"Execution failed: {exec_result['message']}")
# Action was NOT recorded in this case
elif user_choice == 'skip':
print("Skipping planned step.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped via panel")
elif user_choice == 'abort':
print("Aborting recording process.")
self._user_abort_recording = True
# No need to update task manager status if aborting globally
else: # Should not happen with panel, but handle defensively (e.g., timeout resulted in None)
print("Invalid choice or timeout. Skipping step.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Invalid user choice or timeout")
# --- Cleanup UI after interaction ---
self.browser_controller.clear_highlights()
self.panel.hide_recorder_panel()
if listener_setup:
self.browser_controller.remove_click_listener() # Ensure listener removed
return not self._user_abort_recording # Return True if handled (recorded/skipped/failed for retry), False only on ABORT
def _get_llm_assertion_target_index(self, planned_desc: str, dom_context_str: str) -> Tuple[Optional[int], Optional[str]]:
"""Helper function to ask LLM for the target index for an assertion."""
def _handle_assertion_recording(self, planned_step: Dict[str, Any]) -> bool:
"""
Handles prompting the user for assertion details based on a 'Verify...' planned step.
Returns True if recorded/skipped, False if aborted.
"""
if self.automated_mode:
logger.error("[Auto Mode] Reached manual assertion handler. This indicates verification fallback failed or wasn't triggered. Skipping step.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="Skipped (Manual assertion handler reached in auto mode)")
return True # Skip
planned_desc = planned_step["description"]
logger.info(f"Starting interactive assertion definition for: '{planned_desc}'")
print("\n" + "="*60)
print(f"Planned Step: {planned_desc}")
print("Define Assertion via UI Panel...")
print("="*60)
current_state = "target_selection" # States: target_selection, type_selection, param_input
suggested_selector = None
final_selector = None
assertion_action = None
assertion_params = {}
llm_target_suggestion_failed = False # Track if initial suggestion failed
try:
# 1. Identify Target Element (Use LLM again, simplified prompt)
# We need a selector for the element to assert against.
current_url = self.browser_controller.get_current_url()
dom_context_str = "Error getting DOM"
if self._latest_dom_state:
dom_context_str, _ = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
prompt = f"""
Given the verification step: "{planned_desc}"
And the current interactive elements context (with indices). You can interact with non visible elements in the tree too:
```html
{dom_context_str}
```
Identify the element index `[index]` most relevant to this verification task.
Respond ONLY with a JSON object matching the schema:
{{
"index": INDEX_NUMBER_OR_NULL,
"reasoning": "OPTIONAL_REASONING_IF_NULL"
}}
Example Output (Found): {{"index": 5}}
Example Output (Not Found): {{"index": null, "reasoning": "Cannot determine a single target element for 'verify presence of error'."}}
"""
logger.debug(f"[LLM ASSERT PROMPT] Sending prompt for assertion target index:\n{prompt[:500]}...")
response_obj = self.llm_client.generate_json(AssertionTargetIndexSchema, prompt)
target_index = None
llm_reasoning = "LLM did not provide a target index or reasoning." # Default
if isinstance(response_obj, AssertionTargetIndexSchema):
logger.debug(f"[LLM ASSERT RESPONSE] Parsed index response: {response_obj}")
target_index = response_obj.index # Will be None if null in JSON
if target_index is None and response_obj.reasoning:
llm_reasoning = response_obj.reasoning
elif target_index is None:
llm_reasoning = "LLM did not identify a target element (index is null)."
elif isinstance(response_obj, str): # Handle error string
logger.error(f"[LLM ASSERT RESPONSE] Failed to get target index JSON: {response_obj}")
llm_reasoning = f"LLM error getting target index: {response_obj}"
else: # Handle unexpected type
logger.error(f"[LLM ASSERT RESPONSE] Unexpected response type for target index: {type(response_obj)}")
llm_reasoning = f"Unexpected LLM response type: {type(response_obj)}"
target_node = None
target_selector = None
if target_index is not None:
if self._latest_dom_state and self._latest_dom_state.selector_map:
target_node = self._latest_dom_state.selector_map.get(target_index)
if target_node and target_node.css_selector:
suggested_selector = target_node.css_selector
print(f"AI suggests target [Index: {target_index}]: <{target_node.tag_name}>")
print(f" Selector: `{suggested_selector}`")
self.browser_controller.clear_highlights()
self.browser_controller.highlight_element(suggested_selector, target_index, color="#0000FF", text="Assert Target?")
else:
print(f"AI suggested index [{target_index}], but element/selector not found.")
llm_target_suggestion_failed = True
else:
print(f"AI suggested index [{target_index}], but DOM map unavailable.")
llm_target_suggestion_failed = True
else:
print(f"AI could not suggest a target element. Reason: {llm_reasoning}")
llm_target_suggestion_failed = True # Mark as failed if no index
except Exception as e:
logger.error(f"Error getting initial assertion target suggestion: {e}", exc_info=True)
print(f"Error getting AI suggestion: {e}")
llm_target_suggestion_failed = True
# --- User confirms/overrides target selector ---
while True:
user_choice = None
override_selector = None
# --- State 1: Target Selection ---
if current_state == "target_selection":
print("Panel State: Confirm or Override Target Selector.")
self.panel.show_assertion_target_panel(planned_desc, suggested_selector)
listener_setup = self.browser_controller.setup_click_listener()
if not listener_setup:
logger.error("Failed to set up click listener for override.")
user_choice = 'abort' # Force abort if listener fails
else:
# Wait for click override OR panel interaction
try:
logger.debug("Waiting for user click override (Assertion Target)...")
override_selector = self.browser_controller.wait_for_user_click_or_timeout(5.0) # 5s for click override
if override_selector:
print(f"\n[Recorder] User override target detected! Using selector: {override_selector}")
user_choice = 'override_target_confirmed' # Internal choice after click
else:
logger.debug("No click override. Waiting for panel interaction (Assertion Target)...")
user_choice = self.panel.wait_for_panel_interaction(15.0) # Wait longer for panel
if not user_choice: user_choice = 'skip' # Default to skip on panel timeout
except Exception as e:
logger.error(f"Error during assertion target interaction wait: {e}", exc_info=True)
user_choice = 'abort'
# --- Process Target Choice ---
self.browser_controller.remove_click_listener() # Remove listener after this stage
if user_choice == 'confirm_target':
if suggested_selector:
final_selector = suggested_selector
print(f"Using suggested target: {final_selector}")
current_state = "type_selection" # Move to next state
continue # Restart loop in new state
else:
print("Error: Cannot confirm target, no suggestion was available.")
# Stay in this state or treat as skip? Let's allow retry.
current_state = "target_selection"
continue
elif user_choice == 'override_target_confirmed': # Came from click override
if override_selector:
final_selector = override_selector
print(f"Using override target: {final_selector}")
# Try highlighting the user's choice
try:
self.browser_controller.clear_highlights()
self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target")
except Exception as e:
print(f"Warning: Could not highlight user selector '{final_selector}': {e}")
current_state = "type_selection" # Move to next state
continue # Restart loop in new state
else:
print("Error: Override click detected but no selector captured.")
current_state = "target_selection" # Stay here
continue
elif user_choice == 'override_target': # Clicked button in panel to enable clicking
print("Click the element on the page you want to assert against...")
self.panel.hide_recorder_panel() # Hide panel while clicking
listener_setup = self.browser_controller.setup_click_listener()
if not listener_setup:
logger.error("Failed to set up click listener for override.")
user_choice = 'abort'
else:
try:
override_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0) # Longer wait for manual click
if override_selector:
print(f"\n[Recorder] User override target selected: {override_selector}")
user_choice = 'override_target_confirmed' # Set internal choice
else:
print("Timeout waiting for override click. Please try again.")
user_choice = None # Force loop restart in target_selection
except Exception as e:
logger.error(f"Error during manual override click wait: {e}", exc_info=True)
user_choice = 'abort'
self.browser_controller.remove_click_listener() # Remove listener
if user_choice == 'override_target_confirmed':
final_selector = override_selector
try:
self.browser_controller.clear_highlights()
self.browser_controller.highlight_element(final_selector, 0, color="#00FF00", text="User Target")
except Exception as e: print(f"Warning: Could not highlight user selector: {e}")
current_state = "type_selection"
continue
elif user_choice == 'abort':
self._user_abort_recording = True; break # Exit loop
else: # Timeout or error during manual click
current_state = "target_selection" # Go back to target panel
continue
elif user_choice == 'skip':
print("Skipping assertion definition.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion")
break # Exit loop, return True
elif user_choice == 'abort':
self._user_abort_recording = True; break # Exit loop, return False
else: # Includes timeout, None, unexpected values
print("Invalid choice or timeout. Please select target action.")
current_state = "target_selection" # Stay here
continue
# --- State 2: Type Selection ---
elif current_state == "type_selection":
if not final_selector: # Should not happen if logic is correct
logger.error("Assertion state error: Reached type selection without a final selector.")
current_state = "target_selection"; continue # Go back
print("Panel State: Select Assertion Type.")
self.panel.show_assertion_type_panel(final_selector)
user_choice = self.panel.wait_for_panel_interaction(20.0) # Wait for type selection
if not user_choice: user_choice = 'skip' # Default to skip on timeout
# --- Process Type Choice ---
if user_choice.startswith('select_type_'):
type_suffix = user_choice.split('select_type_')[-1]
# Map suffix to actual action string
action_map = {
'text_contains': "assert_text_contains", 'text_equals': "assert_text_equals",
'visible': "assert_visible", 'hidden': "assert_hidden",
'attribute_equals': "assert_attribute_equals", 'element_count': "assert_element_count",
'checked': "assert_checked", 'not_checked': "assert_not_checked", "disabled": "assert_disabled", "enabled": "assert_enabled", "vision_llm": "assert_llm_verification"
}
assertion_action = action_map.get(type_suffix)
if not assertion_action:
print(f"Error: Unknown assertion type selected '{type_suffix}'.")
current_state = "type_selection"; continue # Ask again
print(f"Assertion type selected: {assertion_action}")
# Check if parameters are needed
needs_params_map = {
"assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"],
"assert_attribute_equals": ["Attribute Name", "Expected Value"],
"assert_element_count": ["Expected Count"]
}
if assertion_action in needs_params_map:
current_state = "param_input" # Move to param state
continue # Restart loop in new state
else:
assertion_params = {} # No params needed
# Proceed directly to recording
break # Exit loop to record
elif user_choice == 'back_to_target':
current_state = "target_selection"; continue # Go back
elif user_choice == 'skip':
print("Skipping assertion definition.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion")
break # Exit loop, return True
elif user_choice == 'abort':
self._user_abort_recording = True; break # Exit loop, return False
else: # Includes timeout, None, unexpected values
print("Invalid choice or timeout. Please select assertion type.")
current_state = "type_selection"; continue # Stay here
# --- State 3: Parameter Input ---
elif current_state == "param_input":
if not final_selector or not assertion_action: # Should not happen
logger.error("Assertion state error: Reached param input without selector or action.")
current_state = "target_selection"; continue # Go way back
print("Panel State: Enter Assertion Parameters.")
needs_params_map = { # Redefine here for clarity
"assert_text_contains": ["Expected Text"], "assert_text_equals": ["Expected Text"],
"assert_attribute_equals": ["Attribute Name", "Expected Value"],
"assert_element_count": ["Expected Count"]
}
param_labels = needs_params_map.get(assertion_action, [])
self.panel.show_assertion_params_panel(final_selector, assertion_action, param_labels)
user_choice = self.panel.wait_for_panel_interaction(60.0) # Longer timeout for typing
if not user_choice: user_choice = 'skip' # Default to skip on timeout
# --- Process Param Choice ---
if user_choice == 'submit_params':
raw_params = self.panel.get_assertion_parameters_from_panel(len(param_labels))
if raw_params is None:
print("Error retrieving parameters from panel. Please try again.")
current_state = "param_input"; continue # Stay here
# Map raw_params (param1, param2) to specific keys
assertion_params = {}
try:
if assertion_action == "assert_text_contains" or assertion_action == "assert_text_equals":
assertion_params["expected_text"] = raw_params.get("param1", "")
elif assertion_action == "assert_attribute_equals":
assertion_params["attribute_name"] = raw_params.get("param1", "")
assertion_params["expected_value"] = raw_params.get("param2", "")
if not assertion_params["attribute_name"]: raise ValueError("Attribute name cannot be empty.")
elif assertion_action == "assert_element_count":
count_str = raw_params.get("param1", "")
if not count_str.isdigit(): raise ValueError("Expected count must be a number.")
assertion_params["expected_count"] = int(count_str)
print(f"Parameters submitted: {assertion_params}")
break # Exit loop to record
except ValueError as ve:
print(f"Input Error: {ve}. Please correct parameters.")
current_state = "param_input"; continue # Stay here to retry
elif user_choice == 'back_to_type':
current_state = "type_selection"; continue # Go back
elif user_choice == 'abort':
self._user_abort_recording = True; break # Exit loop, return False
else: # Includes skip, timeout, None, unexpected values
print("Skipping assertion definition.")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "skipped", result="User skipped assertion parameters")
break # Exit loop, return True
else: # Should not happen
logger.error(f"Assertion state error: Unknown state '{current_state}'. Aborting assertion.")
self._user_abort_recording = True; break # Exit loop, return False
# --- End of State Machine Loop ---
self.panel.hide_recorder_panel() # Ensure panel is hidden
# --- Record Step if not Aborted/Skipped ---
if not self._user_abort_recording and assertion_action and final_selector:
# Check if loop exited normally for recording vs. skipping
task_status = self.task_manager.subtasks[self.task_manager.current_subtask_index]['status']
if task_status != "skipped": # Only record if not explicitly skipped
record = {
"step_id": self._current_step_id,
"action": assertion_action,
"description": planned_desc, # Use original planned description
"parameters": assertion_params,
"selector": final_selector,
"wait_after_secs": 0 # Assertions usually don't need waits after
}
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded: {assertion_action} on {final_selector}")
self.task_manager.update_subtask_status(self.task_manager.current_subtask_index, "done", result=f"Recorded as assertion step {record['step_id']}")
return True
else:
logger.info("Assertion definition skipped by user.")
return True # Skipped successfully
elif self._user_abort_recording:
logger.warning("Assertion definition aborted by user.")
return False # Aborted
else:
# Loop exited without recording (likely due to skip choice)
logger.info("Assertion definition finished without recording (likely skipped).")
# Task status should already be 'skipped' from within the loop
return True
def record(self, feature_description: str) -> Dict[str, Any]:
"""
Runs the interactive test recording process with LLM verification and dynamic re-planning.
"""
if not self.is_recorder_mode:
logger.error("Cannot run record() method when not in recorder mode.")
return {"success": False, "message": "Agent not initialized in recorder mode."}
automation_status = "Automated" if self.automated_mode else "Interactive"
logger.info(f"--- Starting Test Recording ({automation_status}) --- Feature: {feature_description}")
if not self.automated_mode:
print(f"\n--- Starting Recording for Feature ({automation_status}) ---\n{feature_description}\n" + "-"*35)
start_time = time.time()
# Initialize recording status
recording_status = {
"success": False,
"feature": feature_description,
"message": "Recording initiated.",
"output_file": None,
"steps_recorded": 0,
"duration_seconds": 0.0,
}
# Reset state for a new recording session
self.history = []
self.recorded_steps = []
self._current_step_id = 1
self.output_file_path = None
self._latest_dom_state = None
self._user_abort_recording = False
self._consecutive_suggestion_failures = 0
self._last_failed_step_index = -1
try:
logger.debug("[RECORDER] Starting browser controller...")
self.browser_controller.start()
self.browser_controller.clear_console_messages()
self.task_manager.set_main_task(feature_description)
logger.debug("[RECORDER] Planning initial steps...")
self._plan_subtasks(feature_description) # Generates the list of planned steps
if not self.task_manager.subtasks:
recording_status["message"] = "❌ Recording Planning Failed: No steps generated."
raise ValueError(recording_status["message"]) # Use ValueError for planning failure
logger.info(f"Beginning interactive recording for {len(self.task_manager.subtasks)} initial planned steps...")
iteration_count = 0 # General loop counter for safety
MAX_RECORDING_ITERATIONS = self.max_iterations * 2 # Allow more iterations for potential recovery steps
while iteration_count < MAX_RECORDING_ITERATIONS:
iteration_count += 1
planned_steps_count = len(self.task_manager.subtasks) # Get current count
current_planned_task = self.task_manager.get_next_subtask()
if self._user_abort_recording: # Abort check
recording_status["message"] = f"Recording aborted by {'user' if not self.automated_mode else 'AI'} because {self._abort_reason if self.automated_mode else 'User had some chores to do'}."
logger.warning(recording_status["message"])
break
if not current_planned_task:
# Check if finished normally or failed planning/retries
if self.task_manager.is_complete():
# Check if ANY task failed permanently
perm_failed_tasks = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask]
logger.error(perm_failed_tasks)
if perm_failed_tasks:
first_failed_idx = self.task_manager.subtasks.index(perm_failed_tasks[0])
failed_task = perm_failed_tasks[0]
recording_status["message"] = f"Recording process completed with failures. First failed step #{first_failed_idx+1}: {failed_task['description']} (Error: {failed_task['result']})"
recording_status["success"] = False # Mark as failed overall
logger.error(recording_status["message"])
elif all(t['status'] in ['done', 'skipped'] for t in self.task_manager.subtasks):
logger.info("All planned steps processed or skipped successfully.")
recording_status["message"] = "Recording process completed."
recording_status["success"] = True # Mark as success ONLY if no permanent failures
else:
# Should not happen if is_complete is true and perm_failed is empty
recording_status["message"] = "Recording finished, but final state inconsistent."
recording_status["success"] = False
logger.warning(recording_status["message"])
else:
recording_status["message"] = "Recording loop ended unexpectedly (no actionable tasks found)."
recording_status["success"] = False
logger.error(recording_status["message"])
break # Exit loop
# Add index to the task dictionary for easier reference
current_task_index = self.task_manager.current_subtask_index
current_planned_task['index'] = current_task_index
logger.info(f"\n===== Processing Planned Step {current_task_index + 1}/{planned_steps_count} (Attempt {current_planned_task['attempts']}) =====")
if not self.automated_mode: print(f"\nProcessing Step {current_task_index + 1}: {current_planned_task['description']}")
# --- Reset Consecutive Failure Counter if step index changes ---
if self._last_failed_step_index != current_task_index:
self._consecutive_suggestion_failures = 0
self._last_failed_step_index = current_task_index # Update last processed index
# --- State Gathering ---
logger.info("Gathering browser state and structured DOM...")
current_url = "Error: Could not get URL"
dom_context_str = "Error: Could not process DOM"
static_id_map = {}
screenshot_bytes = None # Initialize screenshot bytes
self._latest_dom_state = None
self.browser_controller.clear_highlights() # Clear previous highlights
self.panel.hide_recorder_panel()
try:
current_url = self.browser_controller.get_current_url()
# Always try to get DOM state
self._latest_dom_state = self.browser_controller.get_structured_dom(highlight_all_clickable_elements=False, viewport_expansion=-1)
if self._latest_dom_state and self._latest_dom_state.element_tree:
dom_context_str, static_id_map = self._latest_dom_state.element_tree.generate_llm_context_string(context_purpose='verification')
self._last_static_id_map = static_id_map
else:
dom_context_str = "Error processing DOM structure."
self._last_static_id_map = {}
logger.error("[RECORDER] Failed to get valid DOM state.")
# Get screenshot, especially useful for verification/re-planning
screenshot_bytes = self.browser_controller.take_screenshot()
except Exception as e:
logger.error(f"Failed to gather browser state/DOM/Screenshot: {e}", exc_info=True)
dom_context_str = f"Error gathering state: {e}"
# Allow proceeding, LLM might handle navigation or re-planning might trigger
# --- Handle Step Type ---
planned_step_desc_lower = current_planned_task['description'].lower()
step_handled_internally = False # Flag to indicate if step logic was fully handled here
# --- 1. Verification Step ---
if planned_step_desc_lower.startswith(("verify", "assert")):
previous_error = current_planned_task.get("error") # Get validation error from last attempt
logger.info("Handling verification step using LLM...")
verification_result = self._get_llm_verification(
verification_description=current_planned_task['description'],
current_url=current_url,
dom_context_str=dom_context_str,
static_id_map=static_id_map,
screenshot_bytes=screenshot_bytes,
previous_error=previous_error
)
if verification_result:
# Handle user confirmation & recording based on LLM result
handled_ok = self._handle_llm_verification(current_planned_task, verification_result)
if not handled_ok and self._user_abort_recording:
logger.warning("User aborted during verification handling.")
break
else:
# LLM verification failed, fallback to manual
failure_reason = "LLM call/parse failed for verification."
logger.error(f"[Verification] {failure_reason}")
if self.automated_mode:
logger.error("[Auto Mode] LLM verification call failed. Skipping step.")
self.task_manager.update_subtask_status(current_task_index, "skipped", result="Skipped (LLM verification failed)")
else:
print("AI verification failed. Falling back to manual assertion definition.")
if not self._handle_assertion_recording(current_planned_task): # Manual handler
self._user_abort_recording = True
step_handled_internally = True # Verification is fully handled here or in called methods
# --- 2. Navigation Step ---
elif planned_step_desc_lower.startswith("navigate to"):
try:
parts = re.split("navigate to", current_planned_task['description'], maxsplit=1, flags=re.IGNORECASE)
if len(parts) > 1 and parts[1].strip():
url = parts[1].strip()
# print(f"Action: Navigate to {url}")
exec_result = self._execute_action_for_recording("navigate", None, {"url": url})
if exec_result["success"]:
# Record navigation step + implicit wait
nav_step_id = self._current_step_id
self.recorded_steps.append({
"step_id": nav_step_id, "action": "navigate", "description": current_planned_task['description'],
"parameters": {"url": url}, "selector": None, "wait_after_secs": 0 # Wait handled by wait_for_load_state
})
self._current_step_id += 1
self.recorded_steps.append({ # Add implicit wait
"step_id": self._current_step_id, "action": "wait_for_load_state", "description": "Wait for page navigation",
"parameters": {"state": "domcontentloaded"}, "selector": None, "wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
})
self._current_step_id += 1
logger.info(f"Steps {nav_step_id}, {self._current_step_id-1} recorded: navigate and wait")
self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded navigation")
self._consecutive_suggestion_failures = 0 # Reset failure counter on success
else:
# NAVIGATION FAILED - Potential trigger for re-planning
logger.error(f"Navigation failed: {exec_result['message']}")
reason = f"Navigation to '{url}' failed: {exec_result['message']}"
# Try re-planning instead of immediate skip/abort
if self._trigger_re_planning(current_planned_task, reason):
logger.info("Re-planning successful, continuing with recovery steps.")
# Recovery steps inserted, loop will pick them up
else:
# Re-planning failed or user aborted/skipped recovery
if not self._user_abort_recording: # Check if abort wasn't the reason
logger.warning("Re-planning failed or declined after navigation failure. Skipping original step.")
# Status already updated by _trigger_re_planning if skipped/aborted
else:
raise ValueError("Could not parse URL after 'navigate to'.")
except Exception as nav_e:
logger.error(f"Error processing navigation step '{current_planned_task['description']}': {nav_e}")
reason = f"Error processing navigation step: {nav_e}"
if self._trigger_re_planning(current_planned_task, reason):
logger.info("Re-planning successful after navigation processing error.")
else:
if not self._user_abort_recording:
logger.warning("Re-planning failed/declined. Marking original navigation step failed.")
self.task_manager.update_subtask_status(current_task_index, "failed", error=reason) # Mark as failed if no recovery
step_handled_internally = True # Navigation handled
# --- 3. Scroll Step ---
elif planned_step_desc_lower.startswith("scroll"):
try:
direction = "down" if "down" in planned_step_desc_lower else "up" if "up" in planned_step_desc_lower else None
if direction:
exec_result = self._execute_action_for_recording("scroll", None, {"direction": direction})
if exec_result["success"]:
self.recorded_steps.append({
"step_id": self._current_step_id, "action": "scroll", "description": current_planned_task['description'],
"parameters": {"direction": direction}, "selector": None, "wait_after_secs": 0.2
})
self._current_step_id += 1
logger.info(f"Step {self._current_step_id-1} recorded: scroll {direction}")
self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded scroll")
self._consecutive_suggestion_failures = 0 # Reset failure counter
else:
self.task_manager.update_subtask_status(current_task_index, "skipped", result="Optional scroll failed")
else:
self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown scroll direction")
except Exception as scroll_e:
logger.error(f"Error handling scroll step: {scroll_e}")
self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Scroll step failed: {scroll_e}") # Mark failed
step_handled_internally = True # Scroll handled
# -- 4. Handle Visual Baseline Capture Step ---
elif planned_step_desc_lower.startswith("visually baseline"):
planned_desc = current_planned_task['description']
logger.info(f"Handling planned step: '{planned_desc}'")
target_description = planned_step_desc_lower.replace("visually baseline the", "").strip()
default_baseline_id = re.sub(r'\s+', '_', target_description) # Generate default ID
default_baseline_id = re.sub(r'[^\w\-]+', '', default_baseline_id)[:50] # Sanitize
baseline_id = None
target_selector = None
capture_type = 'page' # Default to full page
# --- Mode-dependent handling ---
if self.automated_mode:
print = lambda *args, **kwargs: logger.info(f"[Auto Mode Baseline] {' '.join(map(str, args))}")
baseline_id = default_baseline_id or f"baseline_{self._current_step_id}" # Ensure ID exists
print(f"Capturing baseline: '{baseline_id}' (Full Page - Default)")
# Note: Automated mode currently only supports full page baselines.
# To support element baselines, we'd need AI to suggest selector or pre-define targets.
else: # Interactive Mode
print("\n" + "="*60)
print(f"Planned Step: {planned_desc}")
baseline_id = input(f"Enter Baseline ID (default: '{default_baseline_id}'): ").strip() or default_baseline_id
capture_choice = input("Capture Full Page (P) or Specific Element (E)? [P]: ").strip().lower()
if capture_choice == 'e':
capture_type = 'element'
print("Click the element to capture as baseline...")
self.browser_controller.clear_highlights() # Clear any previous
listener_setup = self.browser_controller.setup_click_listener()
if listener_setup:
try:
target_selector = self.browser_controller.wait_for_user_click_or_timeout(20.0)
if target_selector:
print(f"Element selected. Using selector: {target_selector}")
# Highlight the selected element briefly
try:
self.browser_controller.highlight_element(target_selector, 0, color="#00FF00", text="Baseline Element")
time.sleep(1.5) # Show highlight briefly
except: pass # Ignore highlight errors
else:
print("No element selected (timeout). Defaulting to Full Page.")
capture_type = 'page'
except Exception as e:
logger.error(f"Error during element selection for baseline: {e}")
print("Error selecting element. Defaulting to Full Page.")
capture_type = 'page'
self.browser_controller.remove_click_listener() # Clean up listener
else:
print("Error setting up click listener. Defaulting to Full Page.")
capture_type = 'page'
else: # Default to Page
print("Capturing Full Page baseline.")
capture_type = 'page'
# --- Capture and Save Baseline ---
capture_success = False
final_screenshot_bytes = None
if capture_type == 'element' and target_selector:
final_screenshot_bytes = self.browser_controller.take_screenshot_element(target_selector)
if final_screenshot_bytes:
capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=target_selector)
else:
logger.error(f"Failed to capture element screenshot for baseline '{baseline_id}' selector '{target_selector}'.")
if not self.automated_mode: print("Error: Failed to capture element screenshot.")
else: # Full page
# Use the screenshot already taken during state gathering if available
final_screenshot_bytes = screenshot_bytes
if final_screenshot_bytes:
capture_success = self._save_visual_baseline(baseline_id, final_screenshot_bytes, selector=None)
else:
logger.error(f"Failed to capture full page screenshot for baseline '{baseline_id}'.")
if not self.automated_mode: print("Error: Failed to capture full page screenshot.")
# --- Record assert_visual_match step ---
if capture_success:
record = {
"step_id": self._current_step_id,
"action": "assert_visual_match", # The corresponding execution action
"description": planned_desc, # Use the baseline description
"parameters": {"baseline_id": baseline_id},
"selector": target_selector, # Null for page, selector for element
"wait_after_secs": DEFAULT_WAIT_AFTER_ACTION
}
self.recorded_steps.append(record)
self._current_step_id += 1
logger.info(f"Step {record['step_id']} recorded: assert_visual_match for baseline '{baseline_id}' ({'Element' if target_selector else 'Page'})")
self.task_manager.update_subtask_status(current_task_index, "done", result=f"Recorded baseline '{baseline_id}'")
self._consecutive_suggestion_failures = 0
else:
# Baseline capture/save failed
logger.error(f"Failed to save baseline '{baseline_id}'. Skipping recording.")
if not self.automated_mode: print(f"Failed to save baseline '{baseline_id}'. Skipping.")
self.task_manager.update_subtask_status(current_task_index, "skipped", result="Failed to save baseline")
step_handled_internally = True # Baseline capture handled
# --- 5. Wait Step ---
elif planned_step_desc_lower.startswith("wait for"):
logger.info(f"Handling planned wait step: {current_planned_task['description']}")
wait_params = {}
wait_selector = None
wait_desc = current_planned_task['description']
parsed_ok = False # Flag to check if parameters were parsed
try:
# Try to parse common patterns
time_match = re.search(r"wait for (\d+(\.\d+)?)\s+seconds?", planned_step_desc_lower)
# Updated regex to be more flexible with optional 'element' word and quotes
element_match = re.search(r"wait for (?:element\s*)?\'?(.*?)\'?\s+to be\s+(\w+)", planned_step_desc_lower)
url_match = re.search(r"wait for url\s*\'?(.*?)\'?", planned_step_desc_lower)
if time_match:
wait_params["timeout_seconds"] = float(time_match.group(1))
wait_action = "wait"
parsed_ok = True
elif element_match:
element_desc_for_selector = element_match.group(1).strip()
state = element_match.group(2).strip()
wait_params["state"] = state
# --- Attempt to resolve selector during recording ---
# For simplicity, let's *assume* the description IS the selector for now.
# A better approach would use LLM or prompt user.
if element_desc_for_selector.startswith(('#', '.', '[')) or '/' in element_desc_for_selector:
wait_selector = element_desc_for_selector
logger.info(f"Using description '{wait_selector}' directly as selector for wait.")
else:
logger.warning(f"Cannot directly use '{element_desc_for_selector}' as selector. Wait step might fail execution. Recording intent.")
# Record without selector, executor might fail unless enhanced
wait_desc += f" (Element Description: {element_desc_for_selector})" # Add detail to desc
wait_params["selector"] = wait_selector # Store selector (or None) in params for execution call
wait_action = "wait" # Still use generic wait
parsed_ok = True
elif url_match:
wait_params["url"] = url_match.group(1) # URL pattern
wait_action = "wait" # Use generic wait
parsed_ok = True
else:
logger.warning(f"Could not parse wait parameters from: '{current_planned_task['description']}'. Skipping.")
self.task_manager.update_subtask_status(current_task_index, "skipped", result="Unknown wait format")
wait_action = None
parsed_ok = False
if parsed_ok and wait_action:
# --- Execute the wait ---
logger.info(f"Executing wait action: {wait_params}")
wait_exec_result = self.browser_controller.wait(**wait_params)
if wait_exec_result["success"]:
logger.info("Wait execution successful during recording.")
# --- Record the step AFTER successful execution ---
self.recorded_steps.append({
"step_id": self._current_step_id, "action": wait_action, "description": wait_desc,
"parameters": wait_params, # Use the parsed params
"selector": wait_selector, # Record selector if found
"wait_after_secs": 0
})
self._current_step_id += 1
logger.info(f"Step {self._current_step_id-1} recorded: {wait_action} with params {wait_params}")
self.task_manager.update_subtask_status(current_task_index, "done", result="Recorded and executed wait step")
else:
# Wait failed during recording
logger.error(f"Wait execution FAILED during recording: {wait_exec_result['message']}")
# Decide how to handle: Skip? Fail? Abort? Let's skip for now.
if not self.automated_mode:
cont = input("Wait failed. Skip this step (S) or Abort recording (A)? [S]: ").strip().lower()
if cont == 'a':
self._user_abort_recording = True
self._abort_reason = "User aborted after wait failed."
else:
self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}")
else: # Automated mode - just skip
self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Wait failed during recording: {wait_exec_result['message']}")
except Exception as wait_e:
logger.error(f"Error parsing or executing wait step: {wait_e}")
# Mark as failed if parsing/execution error occurred
self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Wait step processing failed: {wait_e}")
step_handled_internally = True
# --- 4. Default: Assume Interactive Click/Type ---
if not step_handled_internally:
# --- AI Suggestion ---
logger.critical(dom_context_str)
ai_suggestion = self._determine_action_and_selector_for_recording(
current_planned_task, current_url, dom_context_str
)
# --- Handle Suggestion Result ---
if not ai_suggestion or ai_suggestion.get("action") == "suggestion_failed":
reason = ai_suggestion.get("reasoning", "LLM failed to provide valid suggestion.") if ai_suggestion else "LLM suggestion generation failed."
logger.error(f"AI suggestion failed for step {current_task_index + 1}: {reason}")
self._consecutive_suggestion_failures += 1
# Check if we should try re-planning due to repeated failures
if self._consecutive_suggestion_failures > self.task_manager.max_retries_per_subtask:
logger.warning(f"Maximum suggestion retries exceeded for step {current_task_index + 1}. Triggering re-planning.")
replan_reason = f"AI failed to suggest an action/selector repeatedly for step: '{current_planned_task['description']}'. Last reason: {reason}"
if self._trigger_re_planning(current_planned_task, replan_reason):
logger.info("Re-planning successful after suggestion failures.")
# Loop continues with recovery steps
else:
# Re-planning failed or user aborted/skipped
if not self._user_abort_recording:
logger.error("Re-planning failed/declined. Marking original step as failed permanently.")
self.task_manager.update_subtask_status(current_task_index, "failed", error=f"Failed permanently after repeated suggestion errors and failed re-planning attempt. Last reason: {reason}", force_update=True)
else:
# Mark as failed for normal retry by TaskManager
self.task_manager.update_subtask_status(current_task_index, "failed", error=reason)
# Continue loop, TaskManager will offer retry if possible
elif ai_suggestion.get("action") == "action_not_applicable":
reason = ai_suggestion.get("reasoning", "Step not a click/type.")
logger.info(f"Planned step '{current_planned_task['description']}' determined not applicable by AI. Skipping. Reason: {reason}")
# Could this trigger re-planning? Maybe if it happens unexpectedly. For now, treat as skip.
self.task_manager.update_subtask_status(current_task_index, "skipped", result=f"Skipped non-interactive step ({reason})")
self._consecutive_suggestion_failures = 0 # Reset counter on skip
elif ai_suggestion.get("action") in ["click", "type", "check", "uncheck", "select", "key_press", "drag_and_drop"]:
# --- Handle Interactive Step (Confirmation/Override/Execution) ---
# This method now returns True if handled (recorded, skipped, retry requested), False if aborted
# It also internally updates task status based on outcome.
handled_ok = self._handle_interactive_step_recording(current_planned_task, ai_suggestion)
if not handled_ok and self._user_abort_recording:
logger.warning("User aborted during interactive step handling.")
break # Exit main loop immediately on abort
# Check if the step failed execution and might need re-planning
current_task_status = self.task_manager.subtasks[current_task_index]['status']
if current_task_status == 'failed':
# _handle_interactive_step_recording marks failed if execution fails and user doesn't skip/abort
# Check if it was an execution failure (not just suggestion retry)
error_msg = self.task_manager.subtasks[current_task_index].get('error', '')
if "Execution failed" in error_msg: # Check for execution failure messages
logger.warning(f"Execution failed for step {current_task_index + 1}. Triggering re-planning.")
replan_reason = f"Action execution failed for step '{current_planned_task['description']}'. Error: {error_msg}"
if self._trigger_re_planning(current_planned_task, replan_reason):
logger.info("Re-planning successful after execution failure.")
# Loop continues with recovery steps
else:
# Re-planning failed or declined
if not self._user_abort_recording:
logger.error("Re-planning failed/declined after execution error. Step remains failed.")
# Task already marked as failed by _handle_interactive_step_recording
# else: It was likely marked failed to retry suggestion - allow normal retry flow
elif current_task_status == 'done' or current_task_status == 'skipped':
self._consecutive_suggestion_failures = 0 # Reset failure counter on success/skip
else: # Should not happen
logger.error(f"Unexpected AI suggestion action: {ai_suggestion.get('action')}. Skipping step.")
self.task_manager.update_subtask_status(current_task_index, "failed", error="Unexpected AI action suggestion")
# --- Cleanup after processing a step attempt ---
self.browser_controller.clear_highlights()
# Listener removal is handled within _handle_interactive_step_recording and wait_for_user_click...
# self.browser_controller.remove_click_listener() # Ensure listener is off - redundant?
# Small delay between steps/attempts
if not self._user_abort_recording: # Don't delay if aborting
time.sleep(0.3)
# --- Loop End ---
if not recording_status["success"] and iteration_count >= MAX_RECORDING_ITERATIONS:
recording_status["message"] = f"⚠️ Recording Stopped: Maximum iterations ({MAX_RECORDING_ITERATIONS}) reached."
recording_status["success"] = False # Ensure max iterations means failure
logger.warning(recording_status["message"])
# --- Final Save ---
if not self._user_abort_recording and self.recorded_steps:
try:
if recording_status.get("success", False): # Only check if currently marked as success
perm_failed_tasks_final = [t for t in self.task_manager.subtasks if t['status'] == 'failed' and t['attempts'] > self.task_manager.max_retries_per_subtask]
if perm_failed_tasks_final:
recording_status["success"] = False # Override success if any task failed
recording_status["message"] = recording_status["message"].replace("completed.", "completed with failures.") # Adjust message
logger.warning("Overriding overall success status to False due to permanently failed steps found.")
output_data = {
"test_name": f"{feature_description[:50]}_Test",
"feature_description": feature_description,
"recorded_at": datetime.utcnow().isoformat() + "Z",
"console_logs": self.browser_controller.console_messages,
"steps": self.recorded_steps
}
recording_status["console_messages"] = self.browser_controller.console_messages
ts = time.strftime("%Y%m%d_%H%M%S")
safe_feature_name = re.sub(r'[^\w\-]+', '_', feature_description)[:50]
if self.file_name is None:
self.file_name = f"test_{safe_feature_name}_{ts}.json"
else:
self.file_name = self.file_name+f"{safe_feature_name}_{ts}_test.json"
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
self.output_file_path = os.path.join(output_dir, self.file_name)
with open(self.output_file_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
recording_status["output_file"] = self.output_file_path
recording_status["steps_recorded"] = len(self.recorded_steps)
# Set success only if we saved something and didn't explicitly fail/abort
if recording_status["success"]:
logger.info(f"Recording successfully saved to: {self.output_file_path}")
else:
logger.warning(f"Recording finished with status: {'Failed' if not self._user_abort_recording else 'Aborted'}. Saved {len(self.recorded_steps)} steps to: {self.output_file_path}. Message: {recording_status.get('message')}")
except Exception as save_e:
logger.error(f"Failed to save recorded steps to JSON: {save_e}", exc_info=True)
recording_status["message"] = f"Failed to save recording: {save_e}"
recording_status["success"] = False
elif self._user_abort_recording:
if not self._abort_reason:
recording_status["message"] = "Recording aborted by user. No file saved."
recording_status["success"] = False
else: # No steps recorded
recording_status["message"] = "No steps were recorded."
recording_status["success"] = False
except ValueError as e: # Catch planning errors specifically
logger.critical(f"Test planning failed: {e}", exc_info=True)
recording_status["message"] = f"❌ Test Planning Failed: {e}"
recording_status["success"] = False # Ensure failure state
except Exception as e:
logger.critical(f"An critical unexpected error occurred during recording: {e}", exc_info=True)
recording_status["message"] = f"❌ Critical Error during recording: {e}"
recording_status["success"] = False # Ensure failure state
finally:
logger.info("--- Ending Test Recording ---")
# Ensure cleanup even if browser wasn't started fully
if hasattr(self, 'browser_controller') and self.browser_controller:
self.browser_controller.clear_highlights()
self.browser_controller.remove_click_listener() # Attempt removal
self.panel.remove_recorder_panel()
self.browser_controller.close()
end_time = time.time()
recording_status["duration_seconds"] = round(end_time - start_time, 2)
logger.info(f"Recording process finished in {recording_status['duration_seconds']:.2f} seconds.")
logger.info(f"Final Recording Status: {'Success' if recording_status['success'] else 'Failed/Aborted'} - {recording_status['message']}")
if recording_status.get("output_file"):
logger.info(f"Output file: {recording_status.get('output_file')}")
return recording_status # Return the detailed status dictionary