"""
Test case generation and execution module for Percepta MCP.
"""
import asyncio
import json
import logging
import os
import tempfile
from datetime import datetime, UTC
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, Literal
from ..config import Settings
from ..ai_router import AIRouter
from .browser_tools import BrowserAutomation
logger = logging.getLogger(__name__)
class AutomatedTestGenerator:
"""Generates and executes front-end test cases based on AI analysis."""
def __init__(self, settings: Settings, ai_router: Optional[AIRouter] = None,
browser_automation: Optional[BrowserAutomation] = None):
self.settings = settings
self.ai_router = ai_router
self.browser_automation = browser_automation or BrowserAutomation(settings)
self.test_results_dir = Path("test_results")
self.test_results_dir.mkdir(exist_ok=True)
async def generate_test_case(self,
url: str,
description: str,
test_type: Literal["navigation", "form", "visual", "accessibility", "e2e"] = "navigation",
elements: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""
Generate a test case based on the provided URL and description.
Args:
url: The URL of the page to test
description: Description of the test scenario
test_type: Type of test to generate
elements: Optional list of elements to interact with
Returns:
A dictionary containing the generated test case
"""
try:
logger.info(f"Generating test case for {url}: {description}")
if self.ai_router is None:
return {
"success": False,
"error": "AI router not initialized",
"test_case": None
}
# Navigate to the page and get context information
browser_result = await self.browser_automation.navigate(url)
if not browser_result.get("success"):
return {
"success": False,
"error": f"Failed to navigate to URL: {browser_result.get('error')}",
"test_case": None
}
# Extract page info for context
page_info = await self.browser_automation.get_page_info()
page_text = await self.browser_automation.extract_text()
# Generate test steps with AI
prompt = f"""Generate a Playwright test script for the following scenario:
URL: {url}
Description: {description}
Test Type: {test_type}
Page Information:
Title: {page_info.get('title')}
URL: {page_info.get('url')}
Requirements:
1. Create a complete, working Playwright test script in Python
2. Include necessary imports and setup
3. Add detailed comments explaining each step
4. Include proper assertions to validate the test
5. Make the test robust with appropriate waits and error handling
6. Return ONLY executable Python code without any explanations
Current Page Content Summary:
{str(page_text)[:1500] if page_text else "No content available"}
"""
if elements:
prompt += "\nSpecific elements to interact with:\n"
for i, element in enumerate(elements):
prompt += f"{i+1}. {element.get('description', 'Element')}: {element.get('selector', 'No selector')}\n"
# Generate the test script using AI
ai_response = await self.ai_router.generate(prompt, provider=None)
if ai_response.error:
return {
"success": False,
"error": f"AI generation error: {ai_response.error}",
"test_case": None
}
# Extract code from the response
test_script = ai_response.content.strip()
# Make sure we're only getting the Python code
if "```python" in test_script:
test_script = test_script.split("```python")[1].split("```")[0].strip()
# Save the generated test case
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
test_file_name = f"test_{test_type}_{timestamp}.py"
test_file_path = self.test_results_dir / test_file_name
with open(test_file_path, "w") as f:
f.write(test_script)
return {
"success": True,
"test_case": {
"name": test_file_name,
"path": str(test_file_path),
"url": url,
"description": description,
"type": test_type,
"script": test_script,
"timestamp": datetime.now(UTC).isoformat()
}
}
except Exception as e:
logger.error(f"Error generating test case: {e}")
return {
"success": False,
"error": f"Failed to generate test case: {str(e)}",
"test_case": None
}
async def execute_test_case(self, test_case_path: Union[str, Path]) -> Dict[str, Any]:
"""
Execute a previously generated test case.
Args:
test_case_path: Path to the test case file
Returns:
A dictionary containing the test execution results
"""
try:
logger.info(f"Executing test case: {test_case_path}")
test_path = Path(test_case_path)
if not test_path.exists():
return {
"success": False,
"error": f"Test file not found: {test_case_path}",
"results": None
}
# Create temporary directory for results
with tempfile.TemporaryDirectory() as tmpdirname:
# Create a pytest command to execute the test
result = await asyncio.create_subprocess_shell(
f"cd {tmpdirname} && python -m pytest {test_path} -v",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
stdout_str = stdout.decode()
stderr_str = stderr.decode()
success = result.returncode == 0
return {
"success": success,
"test_path": str(test_path),
"exit_code": result.returncode,
"output": stdout_str,
"errors": stderr_str if result.returncode != 0 else None,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error executing test case: {e}")
return {
"success": False,
"error": f"Failed to execute test case: {str(e)}",
"results": None
}
async def generate_and_execute_test(self,
url: str,
description: str,
test_type: Literal["navigation", "form", "visual", "accessibility", "e2e"] = "navigation",
elements: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""
Generate and immediately execute a test case.
Args:
url: The URL of the page to test
description: Description of the test scenario
test_type: Type of test to generate
elements: Optional list of elements to interact with
Returns:
A dictionary containing both the generated test case and execution results
"""
# Generate the test case
generation_result = await self.generate_test_case(url, description, test_type, elements)
if not generation_result["success"]:
return generation_result
# Execute the test case
test_path = generation_result["test_case"]["path"]
execution_result = await self.execute_test_case(test_path)
# Combine results
return {
"success": execution_result["success"],
"test_case": generation_result["test_case"],
"execution": execution_result,
"timestamp": datetime.now(UTC).isoformat()
}