Skip to main content
Glama
hao-cyber
by hao-cyber
ui_enhanced.py23.7 kB
"""Enhanced UI inspection and automation functions for Phone MCP. This module extends basic UI functionality with advanced features for UI testing and automation. """ import asyncio import json import re import time from typing import List, Dict, Any, Optional, Callable, Tuple, Union from .ui import dump_ui, tap_element, find_element_by_text, find_element_by_id from .interactions import swipe_screen, press_key, input_text from ..core import run_command, check_device_connection async def find_element_by_content_desc( content_desc: str, partial_match: bool = False ) -> str: """Find UI element by content description. Args: content_desc (str): Content description to search for partial_match (bool): If True, find elements containing the description Returns: str: JSON string with matching elements or error message """ # Get the full UI dump dump_response = await dump_ui() try: dump_data = json.loads(dump_response) if dump_data.get("status") != "success": return dump_response # Find matching elements matches = [] for element in dump_data.get("elements", []): element_desc = element.get("content-desc", "") if ( partial_match and content_desc in element_desc ) or element_desc == content_desc: matches.append(element) return json.dumps( { "status": "success", "query": content_desc, "partial_match": partial_match, "count": len(matches), "elements": matches, }, indent=2, ) except json.JSONDecodeError: return json.dumps( { "status": "error", "message": "Failed to process UI data", "raw_response": dump_response[:500], }, indent=2, ) async def find_element_by_class( class_name: str, package_name: Optional[str] = None ) -> str: """Find UI element by class name. Args: class_name (str): Class name to search for (e.g., "android.widget.Button") package_name (str, optional): Package name to limit search to Returns: str: JSON string with matching elements or error message """ # Get the full UI dump dump_response = await dump_ui() try: dump_data = json.loads(dump_response) if dump_data.get("status") != "success": return dump_response # Find matching elements matches = [] for element in dump_data.get("elements", []): element_class = element.get("class", "") element_package = element.get("package", "") if class_name in element_class and ( package_name is None or package_name in element_package ): matches.append(element) return json.dumps( { "status": "success", "query": class_name, "package_filter": package_name, "count": len(matches), "elements": matches, }, indent=2, ) except json.JSONDecodeError: return json.dumps( { "status": "error", "message": "Failed to process UI data", "raw_response": dump_response[:500], }, indent=2, ) async def find_clickable_elements() -> str: """Find all clickable elements on the screen. Returns: str: JSON string with clickable elements or error message """ # Get the full UI dump dump_response = await dump_ui() try: dump_data = json.loads(dump_response) if dump_data.get("status") != "success": return dump_response # Find clickable elements matches = [] for element in dump_data.get("elements", []): if element.get("clickable", False): matches.append(element) return json.dumps( {"status": "success", "count": len(matches), "elements": matches}, indent=2 ) except json.JSONDecodeError: return json.dumps( { "status": "error", "message": "Failed to process UI data", "raw_response": dump_response[:500], }, indent=2, ) async def wait_for_element( find_method: str, search_value: str, timeout_seconds: int = 30, interval_seconds: float = 1.0, **kwargs, ) -> str: """Wait for an element to appear on the screen. Args: find_method (str): Method to use: "text", "id", "content_desc", "class", "clickable" search_value (str): Value to search for timeout_seconds (int): Maximum time to wait in seconds interval_seconds (float): Time between checks in seconds **kwargs: Additional arguments for the find method Returns: str: JSON string with element if found, or error message """ # Map of find methods to their functions find_methods = { "text": find_element_by_text, "id": find_element_by_id, "content_desc": find_element_by_content_desc, "class": find_element_by_class, "clickable": find_clickable_elements, } # Validate find method if find_method not in find_methods: return json.dumps( { "status": "error", "message": f"Invalid find method: {find_method}. Valid methods are: {', '.join(find_methods.keys())}", }, indent=2, ) # Get the appropriate find function directly find_func = find_methods[find_method] start_time = time.time() while time.time() - start_time < timeout_seconds: # Call the appropriate find function try: if find_method == "clickable": response = await find_func() elif find_method == "text": response = await find_func(search_value, kwargs.get("partial", True)) else: response = await find_func(search_value, **kwargs) response_data = json.loads(response) if ( response_data.get("status") == "success" and response_data.get("count", 0) > 0 ): # Element found return json.dumps( { "status": "success", "message": f"Element found after {time.time() - start_time:.1f} seconds", "element": response_data.get("elements", [])[0], "all_elements": response_data.get("elements", []), }, indent=2, ) except Exception as e: # If error occurs, log and continue waiting print(f"Error while waiting for element: {str(e)}") # Wait before next check await asyncio.sleep(interval_seconds) # Timeout reached return json.dumps( { "status": "error", "message": f"Element not found within {timeout_seconds} seconds", "find_method": find_method, "search_value": search_value, }, indent=2, ) async def wait_until_element_gone( find_method: str, search_value: str, timeout_seconds: int = 30, interval_seconds: float = 1.0, **kwargs, ) -> str: """Wait for an element to disappear from the screen. Args: find_method (str): Method to use: "text", "id", "content_desc", "class" search_value (str): Value to search for timeout_seconds (int): Maximum time to wait in seconds interval_seconds (float): Time between checks in seconds **kwargs: Additional arguments for the find method Returns: str: JSON string with success message if element disappeared, or error message """ # Map of find methods to their functions find_methods = { "text": find_element_by_text, "id": find_element_by_id, "content_desc": find_element_by_content_desc, "class": find_element_by_class, } # Validate find method if find_method not in find_methods: return json.dumps( { "status": "error", "message": f"Invalid find method: {find_method}. Valid methods are: {', '.join(find_methods.keys())}", }, indent=2, ) # Get the appropriate find function directly find_func = find_methods[find_method] start_time = time.time() while time.time() - start_time < timeout_seconds: # Call the appropriate find function try: if find_method == "text": response = await find_func(search_value, kwargs.get("partial", True)) else: response = await find_func(search_value, **kwargs) response_data = json.loads(response) if ( response_data.get("status") == "success" and response_data.get("count", 0) == 0 ): # Element is gone return json.dumps( { "status": "success", "message": f"Element disappeared after {time.time() - start_time:.1f} seconds", }, indent=2, ) except Exception as e: # Log error but treat as if element is still present print(f"Error while checking if element is gone: {str(e)}") # Wait before next check await asyncio.sleep(interval_seconds) # Timeout reached return json.dumps( { "status": "error", "message": f"Element still present after {timeout_seconds} seconds", "find_method": find_method, "search_value": search_value, }, indent=2, ) async def scroll_to_element( find_method: str, search_value: str, direction: str = "down", max_swipes: int = 5, swipe_duration_ms: int = 500, **kwargs, ) -> str: """Scroll the screen until an element is found. Args: find_method (str): Method to use: "text", "id", "content_desc", "class" search_value (str): Value to search for direction (str): Direction to scroll: "up", "down", "left", "right" max_swipes (int): Maximum number of swipes to perform swipe_duration_ms (int): Duration of each swipe in milliseconds **kwargs: Additional arguments for the find method Returns: str: JSON string with element if found, or error message """ # Map of find methods to their functions find_methods = { "text": find_element_by_text, # Using direct function reference "id": find_element_by_id, # Using direct function reference "content_desc": find_element_by_content_desc, "class": find_element_by_class, } # Validate find method if find_method not in find_methods: return json.dumps( { "status": "error", "message": f"Invalid find method: {find_method}. Valid methods are: {', '.join(find_methods.keys())}", }, indent=2, ) # Get the appropriate find function find_func = find_methods[find_method] # Get screen size for swipe calculations screen_size_response = await run_command("adb shell wm size") screen_width, screen_height = 1080, 1920 # Default values if screen_size_response[0]: # If command was successful match = re.search(r"(\d+)x(\d+)", screen_size_response[1]) if match: screen_width, screen_height = int(match.group(1)), int(match.group(2)) # Define swipe coordinates based on direction swipe_coords = { "up": ( screen_width // 2, screen_height // 4, screen_width // 2, screen_height * 3 // 4, ), "down": ( screen_width // 2, screen_height * 3 // 4, screen_width // 2, screen_height // 4, ), "left": ( screen_width // 4, screen_height // 2, screen_width * 3 // 4, screen_height // 2, ), "right": ( screen_width * 3 // 4, screen_height // 2, screen_width // 4, screen_height // 2, ), } if direction not in swipe_coords: return json.dumps( { "status": "error", "message": f"Invalid direction: {direction}. Valid directions are: {', '.join(swipe_coords.keys())}", }, indent=2, ) # First check if element is already visible try: if find_method == "text": response = await find_func(search_value, kwargs.get("partial", True)) else: response = await find_func(search_value, **kwargs) response_data = json.loads(response) if ( response_data.get("status") == "success" and response_data.get("count", 0) > 0 ): # Element already found return json.dumps( { "status": "success", "message": "Element already visible, no scrolling needed", "element": response_data.get("elements", [])[0], }, indent=2, ) except Exception as e: # Log the error but continue with scrolling print(f"Error checking if element is already visible: {str(e)}") # Element not found, start scrolling for i in range(max_swipes): # Perform the swipe x1, y1, x2, y2 = swipe_coords[direction] swipe_response = await swipe_screen(x1, y1, x2, y2, swipe_duration_ms) # Wait a moment for the screen to settle await asyncio.sleep(0.5) # Check if element is now visible try: if find_method == "text": response = await find_func(search_value, kwargs.get("partial", True)) else: response = await find_func(search_value, **kwargs) response_data = json.loads(response) if ( response_data.get("status") == "success" and response_data.get("count", 0) > 0 ): # Element found return json.dumps( { "status": "success", "message": f"Element found after {i+1} swipes", "element": response_data.get("elements", [])[0], "swipes_performed": i + 1, }, indent=2, ) except Exception as e: # Log the error but continue with scrolling print(f"Error checking if element is visible after swipe {i+1}: {str(e)}") # Element not found after maximum swipes return json.dumps( { "status": "error", "message": f"Element not found after {max_swipes} swipes", "find_method": find_method, "search_value": search_value, "direction": direction, }, indent=2, ) async def element_exists(find_method: str, search_value: str, **kwargs) -> bool: """Check if an element exists on the screen. Args: find_method (str): Method to use: "text", "id", "content_desc", "class" search_value (str): Value to search for **kwargs: Additional arguments for the find method Returns: bool: True if element exists, False otherwise """ # Map of find methods to their functions find_methods = { "text": find_element_by_text, "id": find_element_by_id, "content_desc": find_element_by_content_desc, "class": find_element_by_class, } # Validate find method if find_method not in find_methods: return False # Get the appropriate find function directly find_func = find_methods[find_method] # Call the appropriate find function try: if find_method == "text": response = await find_func(search_value, kwargs.get("partial", True)) else: response = await find_func(search_value, **kwargs) response_data = json.loads(response) return ( response_data.get("status") == "success" and response_data.get("count", 0) > 0 ) except Exception as e: print(f"Error checking if element exists: {str(e)}") return False async def perform_action_chain(actions: List[Dict[str, Any]]) -> str: """Perform a chain of UI actions in sequence. Args: actions (List[Dict]): List of action dictionaries, each with: - action_type: "tap", "swipe", "wait", "input", "key", "find", "scroll" - params: Dictionary of parameters for the action Returns: str: JSON string with results of each action """ results = [] for i, action in enumerate(actions): action_type = action.get("action_type", "").lower() params = action.get("params", {}) try: # Handle different action types if action_type == "tap": if "element" in params: # Tap an element result = await tap_element(json.dumps(params["element"])) else: # Tap coordinates x = params.get("x", 0) y = params.get("y", 0) from .interactions import tap_screen result = await tap_screen(x, y) results.append( { "step": i + 1, "action": "tap", "status": "success" if "success" in result.lower() else "error", "message": result, } ) elif action_type == "swipe": # Swipe on the screen x1 = params.get("x1", 0) y1 = params.get("y1", 0) x2 = params.get("x2", 0) y2 = params.get("y2", 0) duration = params.get("duration_ms", 300) result = await swipe_screen(x1, y1, x2, y2, duration) results.append( { "step": i + 1, "action": "swipe", "status": "success" if "success" in result.lower() else "error", "message": result, } ) elif action_type == "wait": # Wait for a specified time seconds = params.get("seconds", 1.0) await asyncio.sleep(seconds) results.append( { "step": i + 1, "action": "wait", "status": "success", "message": f"Waited for {seconds} seconds", } ) elif action_type == "input": # Input text text = params.get("text", "") result = await input_text(text) results.append( { "step": i + 1, "action": "input", "status": "success" if "success" in result.lower() else "error", "message": result, } ) elif action_type == "key": # Press a key key = params.get("key", "") result = await press_key(key) results.append( { "step": i + 1, "action": "key", "status": "success" if "success" in result.lower() else "error", "message": result, } ) elif action_type == "find": # Find an element find_method = params.get("method", "text") search_value = params.get("value", "") additional_params = params.get("additional_params", {}) find_func_name = f"find_element_by_{find_method}" if find_func_name not in globals(): results.append( { "step": i + 1, "action": "find", "status": "error", "message": f"Invalid find method: {find_method}", } ) continue find_func = globals()[find_func_name] result = await find_func(search_value, **additional_params) results.append( { "step": i + 1, "action": "find", "status": "success", "message": f"Find operation completed", "result": ( json.loads(result) if isinstance(result, str) else result ), } ) elif action_type == "scroll": # Scroll to find an element find_method = params.get("method", "text") search_value = params.get("value", "") direction = params.get("direction", "down") max_swipes = params.get("max_swipes", 5) additional_params = params.get("additional_params", {}) result = await scroll_to_element( find_method, search_value, direction, max_swipes, **additional_params, ) results.append( { "step": i + 1, "action": "scroll", "status": "success" if "success" in result else "error", "message": ( json.loads(result) if isinstance(result, str) and "{" in result else result ), } ) else: # Unknown action type results.append( { "step": i + 1, "action": action_type, "status": "error", "message": f"Unknown action type: {action_type}", } ) except Exception as e: # Handle any exceptions during action execution results.append( { "step": i + 1, "action": action_type, "status": "error", "message": f"Error executing action: {str(e)}", } ) return json.dumps( {"status": "complete", "actions_performed": len(results), "results": results}, indent=2, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hao-cyber/phone-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server