from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from typing import Dict, List, Optional
from appium import webdriver
from appium.options.common.base import AppiumOptions
from appium.webdriver.common.appiumby import AppiumBy
from openai import AzureOpenAI
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from .config import AppiumConfig, CrawlerSettings
# Configure logging for the crawler module
logger = logging.getLogger(__name__)
class AppCrawler:
"""AI-powered mobile app crawler that uses LLM guidance to explore applications."""
def __init__(self, settings: CrawlerSettings):
self.settings = settings
self.driver = self._create_driver(settings.appium)
self.platform = settings.platform.lower()
self.output_dir = settings.output_dir
self.wait_time = settings.wait_time
self.visited_screens: set[str] = set()
self.completed_actions: set[str] = set()
self.current_step = 0
self.test_steps: List[Dict[str, str]] = []
self.test_example = settings.test_example or ""
self.client = self._create_client(settings)
os.makedirs(self.output_dir, exist_ok=True)
def _create_driver(self, appium: AppiumConfig) -> webdriver.Remote:
logger.info("Initializing Appium driver for server %s", appium.server_url)
options = AppiumOptions()
options.load_capabilities(appium.desired_capabilities)
return webdriver.Remote(command_executor=appium.server_url, options=options)
def _create_client(self, settings: CrawlerSettings) -> AzureOpenAI:
logger.info("Initializing Azure OpenAI client")
return AzureOpenAI(
api_key=settings.azure.api_key,
azure_endpoint=settings.azure.azure_endpoint,
api_version=settings.azure.api_version,
)
def get_screen_hash(self) -> str:
"""Generate a unique hash for the current screen's XML."""
xml = self.driver.page_source
return hashlib.md5(xml.encode("utf-8")).hexdigest()
def save_screen_xml(
self, screen_hash: str, screen_name: Optional[str] = None
) -> str:
"""Save the current screen's XML and a screenshot."""
filepath = os.path.join(
self.output_dir,
f"{screen_hash}_{screen_name or 'screen'}.xml",
)
with open(filepath, "w", encoding="utf-8") as handle:
handle.write(self.driver.page_source)
try:
self.driver.save_screenshot(filepath.replace(".xml", ".png"))
except Exception as exc: # noqa: BLE001 - Appium throws various exception types
logger.error("Failed to save screenshot: %s", exc, exc_info=True)
return filepath
def query_llm_for_next_action(
self,
xml: str,
task_description: str,
task_prompt: str,
) -> Dict[str, str]:
"""Query the LLM for the next action."""
prompt = task_prompt.format(
xml=xml,
task_description=task_description,
current_step=self.current_step,
completed_actions=list(self.completed_actions),
)
answer: Optional[str] = None
try:
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
answer = response.choices[0].message.content
if not answer:
logger.error("LLM response did not include content.")
return {"action": "done", "description": "Empty LLM response"}
logger.debug("Raw LLM response: %s", answer)
if answer.startswith("```json"):
answer = answer[7:]
if answer.endswith("```"):
answer = answer[:-3]
instruction = json.loads(answer.strip())
logger.info("LLM returned: %s", instruction.get("description"))
return instruction
except json.JSONDecodeError as exc:
logger.error("Failed to parse LLM response as JSON: %s", exc)
logger.debug("Raw response was: %s", answer)
return {"action": "done", "description": "Failed to parse LLM response"}
except Exception as exc: # noqa: BLE001 - Azure client can raise various errors
logger.error("Error querying LLM: %s", exc, exc_info=True)
return {"action": "done", "description": f"Error: {exc}"}
def get_mobile_by(self, locator_strategy: str) -> str:
"""Map a locator strategy string to Appium's locator constants."""
strategy = locator_strategy.lower()
if strategy == "id":
return AppiumBy.ID
if strategy == "xpath":
return AppiumBy.XPATH
if strategy == "accessibility_id":
return AppiumBy.ACCESSIBILITY_ID
return AppiumBy.XPATH
def perform_click(self, locator_strategy: str, locator_value: str) -> None:
"""Perform a click action on an element."""
by = self.get_mobile_by(locator_strategy)
try:
element = WebDriverWait(self.driver, self.wait_time).until(
EC.element_to_be_clickable((by, locator_value)),
)
element.click()
logger.info(
"Clicked on element with %s = %s", locator_strategy, locator_value
)
except Exception as exc: # noqa: BLE001 - Appium and Selenium exceptions vary
logger.error("Error performing click: %s", exc, exc_info=True)
def perform_send_keys(
self,
locator_strategy: str,
locator_value: str,
text: str,
) -> None:
"""Send keys to an element."""
by = self.get_mobile_by(locator_strategy)
try:
element = WebDriverWait(self.driver, self.wait_time).until(
EC.presence_of_element_located((by, locator_value)),
)
element.send_keys(text)
logger.info(
"Sent keys to element with %s = %s", locator_strategy, locator_value
)
except Exception as exc: # noqa: BLE001
logger.error("Error performing send_keys: %s", exc, exc_info=True)
def save_test_case(self, task_description: str, test_case_prompt: str) -> None:
"""Save the test case as a .js file using the LLM to generate it."""
test_case_path = os.path.join(self.output_dir, "test_case.js")
steps_info = "\n".join(
f"Step {index + 1}: {step['description']} - {step['action']} on {step['locator_value']}"
for index, step in enumerate(self.test_steps)
)
prompt = test_case_prompt.format(
task_description=task_description,
steps_info=steps_info,
test_example=self.test_example,
)
try:
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
test_case_content = response.choices[0].message.content
if not test_case_content:
logger.error("LLM did not provide test case content.")
return
if test_case_content.startswith("```javascript"):
test_case_content = test_case_content[12:]
if test_case_content.endswith("```"):
test_case_content = test_case_content[:-3]
with open(test_case_path, "w", encoding="utf-8") as handle:
handle.write(test_case_content.strip())
logger.info("Test case saved to: %s", test_case_path)
except Exception as exc: # noqa: BLE001
logger.error("Error generating test case: %s", exc, exc_info=True)
def process_flow(
self,
task_description: str,
task_prompt: str,
test_case_prompt: str,
) -> None:
"""Main processing loop for AI-guided app automation."""
logger.info("Starting LLM-guided process...")
step = 1
last_screen_hash: Optional[str] = None
retry_count = 0
max_retries = 2
time.sleep(self.wait_time)
while True:
xml = self.driver.page_source
screen_hash = self.get_screen_hash()
if screen_hash not in self.visited_screens:
screen_name = f"step_{step}"
self.save_screen_xml(screen_hash, screen_name)
self.visited_screens.add(screen_hash)
logger.info("Saved screen: %s_%s.xml", screen_hash, screen_name)
if screen_hash == last_screen_hash:
retry_count += 1
if retry_count >= max_retries:
logger.warning(
"Screen hasn't changed after multiple attempts. Analyzing current state...",
)
recovery_instruction = self.query_llm_for_next_action(
xml,
f"Recovery needed: {task_description}",
task_prompt,
)
if recovery_instruction.get("action", "").lower() == "done":
logger.info("Task complete as per LLM instruction.")
final_screen_name = f"step_{step}_final_recovery"
self.save_screen_xml(screen_hash, final_screen_name)
logger.info(
"Saved final screen: %s_%s.xml",
screen_hash,
final_screen_name,
)
self.save_test_case(task_description, test_case_prompt)
break
action = recovery_instruction.get("action", "").lower()
if action == "back":
logger.info("Trying to recover by pressing back...")
self.driver.press_keycode(4)
time.sleep(self.wait_time * 2)
else:
locator_strategy = recovery_instruction.get(
"locator_strategy", ""
)
locator_value = recovery_instruction.get("locator_value", "")
if action == "click":
self.perform_click(locator_strategy, locator_value)
elif action == "send_keys":
text = recovery_instruction.get("text", "")
self.perform_send_keys(
locator_strategy, locator_value, text
)
retry_count = 0
continue
logger.warning(
"Screen hasn't changed. Waiting longer... (attempt %s)", retry_count
)
time.sleep(self.wait_time * 2)
continue
retry_count = 0
last_screen_hash = screen_hash
instruction = self.query_llm_for_next_action(
xml, task_description, task_prompt
)
if instruction.get("action", "").lower() == "done":
logger.info("Task complete as per LLM instruction.")
final_screen_name = f"step_{step}_final"
self.save_screen_xml(screen_hash, final_screen_name)
logger.info(
"Saved final screen: %s_%s.xml", screen_hash, final_screen_name
)
self.save_test_case(task_description, test_case_prompt)
break
action = instruction.get("action", "").lower()
locator_strategy = instruction.get("locator_strategy", "")
locator_value = instruction.get("locator_value", "")
if locator_strategy.lower() == "id":
locator_strategy = "xpath"
locator_value = f"//*[@resource-id='{locator_value}']"
action_signature = f"{action}:{locator_value}"
if action_signature in self.completed_actions:
logger.info("Skipping duplicate action: %s", action_signature)
continue
self.test_steps.append(
{
"action": action,
"locator_strategy": locator_strategy,
"locator_value": locator_value,
"text": instruction.get("text", ""),
"description": instruction.get("description", ""),
},
)
if action == "click":
self.perform_click(locator_strategy, locator_value)
elif action == "send_keys":
text = instruction.get("text", "")
self.perform_send_keys(locator_strategy, locator_value, text)
else:
logger.error("Unknown action: %s", action)
break
self.completed_actions.add(action_signature)
self.current_step += 1
time.sleep(self.wait_time * 2)
step += 1
def cleanup(self) -> None:
"""Clean up resources."""
if self.driver:
logger.info("Closing Appium driver session")
self.driver.quit()