Skip to main content
Glama
task_manager.py8.54 kB
# /src/core/task_manager.py import logging from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) class TaskManager: """Manages the main task, subtasks, progress, and status.""" def __init__(self, max_retries_per_subtask: int = 2): # Renamed parameter for clarity internally self.main_task: str = "" # Stores the overall feature description self.subtasks: List[Dict[str, Any]] = [] # Stores the individual test steps self.current_subtask_index: int = 0 # Index of the step being processed or next to process self.max_retries_per_subtask: int = max_retries_per_subtask logger.info(f"TaskManager (Test Mode) initialized (max_retries_per_step={max_retries_per_subtask}).") def set_main_task(self, feature_description: str): """Sets the main feature description being tested.""" self.main_task = feature_description self.subtasks = [] self.current_subtask_index = 0 logger.info(f"Feature under test set: {feature_description}") def add_subtasks(self, test_step_list: List[str]): """Adds a list of test steps derived from the feature description.""" if not self.main_task: logger.error("Cannot add test steps before setting a feature description.") return if not isinstance(test_step_list, list) or not all(isinstance(s, str) and s for s in test_step_list): logger.error(f"Invalid test step list format received: {test_step_list}") raise ValueError("Test step list must be a non-empty list of non-empty strings.") self.subtasks = [] # Clear existing steps before adding new ones for desc in test_step_list: self.subtasks.append({ "description": desc, # The test step description "status": "pending", # pending, in_progress, done, failed "attempts": 0, "result": None, # Store result of the step (e.g., extracted text) "error": None, # Store error if the step failed "_recorded_": False, "last_failed_selector": None # Store selector if failure was element-related }) self.current_subtask_index = 0 if self.subtasks else -1 # Reset index logger.info(f"Added {len(test_step_list)} test steps.") def insert_subtasks(self, index: int, new_step_descriptions: List[str]): """Inserts new test steps at a specific index.""" if not isinstance(new_step_descriptions, list) or not all(isinstance(s, str) and s for s in new_step_descriptions): logger.error(f"Invalid new step list format received for insertion: {new_step_descriptions}") return False # Indicate failure if not (0 <= index <= len(self.subtasks)): # Allow insertion at the end logger.error(f"Invalid index {index} for inserting subtasks (Total steps: {len(self.subtasks)}).") return False new_tasks = [] for desc in new_step_descriptions: new_tasks.append({ "description": desc, "status": "pending", # New tasks start as pending "attempts": 0, "result": None, "error": None, "_recorded_": False, # Ensure internal flags are initialized "last_failed_selector": None }) # Insert the new tasks into the list self.subtasks[index:index] = new_tasks logger.info(f"Inserted {len(new_tasks)} new subtasks at index {index}.") # Crucial: If the insertion happens at or before the current index, # we might need to adjust the current index, but generally, the next call # to get_next_subtask() should find the newly inserted pending tasks if they # are before the previously 'current' task. Let get_next_subtask handle finding the next actionable item. # If insertion happens *after* current processing index, it doesn't immediately affect flow. return True # Indicate success def get_next_subtask(self) -> Optional[Dict[str, Any]]: """ Gets the first test step that is 'pending' or 'failed' with retries remaining. Iterates sequentially. """ for index, task in enumerate(self.subtasks): # In recorder mode, 'failed' means AI suggestion failed, allow retry # In executor mode (if used here), 'failed' means execution failed is_pending = task["status"] == "pending" is_retryable_failure = (task["status"] == "failed" and task["attempts"] <= self.max_retries_per_subtask) if is_pending or is_retryable_failure: # Found the next actionable step if is_retryable_failure: logger.info(f"Retrying test step {index + 1} (Attempt {task['attempts'] + 1}/{self.max_retries_per_subtask + 1})") else: # Pending logger.info(f"Starting test step {index + 1}/{len(self.subtasks)}: {task['description']}") # Update the main index to point to this task BEFORE changing status self.current_subtask_index = index task["status"] = "in_progress" task["attempts"] += 1 # Keep error context on retry, clear result task["result"] = None return task # No actionable tasks found logger.info("No more actionable test steps found.") self.current_subtask_index = len(self.subtasks) # Mark completion return None def update_subtask_status(self, index: int, status: str, result: Any = None, error: Optional[str] = None, force_update: bool = False): """Updates the status of a specific test step.""" if 0 <= index < len(self.subtasks): task = self.subtasks[index] current_status = task["status"] # Allow update only if forced or if task is 'in_progress' # if not force_update and task["status"] != "in_progress": # logger.warning(f"Attempted to update status of test step {index + 1} ('{task['description'][:50]}...') " # f"from '{task['status']}' to '{status}', but it's not 'in_progress'. Ignoring (unless force_update=True).") # return # Log if the status is actually changing if current_status != status: logger.info(f"Updating Test Step {index + 1} status from '{current_status}' to '{status}'.") else: logger.debug(f"Test Step {index + 1} status already '{status}'. Updating result/error.") task["status"] = status task["result"] = result task["error"] = error log_message = f"Test Step {index + 1} ('{task['description'][:50]}...') processed. Status: {status}." if result and status == 'done': log_message += f" Result: {str(result)[:100]}..." if error: log_message += f" Error/Note: {error}" # Use debug for potentially repetitive updates if status doesn't change log_level = logging.INFO if current_status != status else logging.DEBUG logger.log(log_level, log_message) # Log permanent failure clearly if status == "failed" and task["attempts"] > self.max_retries_per_subtask: logger.warning(f"Test Step {index + 1} failed permanently after {task['attempts']} attempts.") else: logger.error(f"Invalid index {index} for updating test step status (Total steps: {len(self.subtasks)}).") def get_current_subtask(self) -> Optional[Dict[str, Any]]: """Gets the test step currently marked by current_subtask_index (likely 'in_progress').""" if 0 <= self.current_subtask_index < len(self.subtasks): return self.subtasks[self.current_subtask_index] return None def is_complete(self) -> bool: """Checks if all test steps have been processed (are 'done' or 'failed' permanently).""" for task in self.subtasks: if task['status'] == 'pending' or \ task['status'] == 'in_progress' or \ (task['status'] == 'failed' and task['attempts'] <= self.max_retries_per_subtask): return False # Found an actionable step return True # All steps processed

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GroundNG/QA-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server