Skip to main content
Glama
hao-cyber
by hao-cyber
messaging.py14.7 kB
"""Messaging-related phone control functions.""" import asyncio import subprocess import json from ..core import run_command from ..config import DEFAULT_COUNTRY_CODE async def send_text_message(phone_number: str, message: str) -> str: """Send a text message to the specified number. Uses the phone's messaging app with UI automation to send SMS. Process: Opens messaging app, fills recipient and content, automatically clicks send button, then auto-exits app. Args: phone_number (str): Recipient's phone number. Country code will be automatically added if not included. Example: "13812345678" or "+8613812345678" message (str): SMS content. Supports any text, including emojis. Example: "Hello, this is a test message" Returns: str: String description of the operation result: - Success: "Text message sent to {phone_number}" - Failure: Message containing error reason, like "Failed to open messaging app: {error}" or "Failed to navigate to send button: {error}" """ # Add country code if not already included if not phone_number.startswith("+"): phone_number = DEFAULT_COUNTRY_CODE + phone_number # Validate phone number format if not phone_number[1:].isdigit(): return "Invalid phone number format. Please use numeric digits only." # Escape single quotes in the message escaped_message = message.replace("'", "\\'") # Open messaging app with the number and message cmd = f"adb shell am start -a android.intent.action.SENDTO -d sms:{phone_number} --es sms_body '{escaped_message}' --ez exit_on_sent true" success, output = await run_command(cmd) # Open messaging app with the number and message, and auto-exit after sending if not success: return f"Failed to open messaging app: {output}" # Give the app time to open (increased wait time) await asyncio.sleep(3) # Get screen dimensions screen_cmd = "adb shell wm size" screen_success, screen_output = await run_command(screen_cmd) width, height = 1080, 2340 # Default values for common phones if screen_success and "Physical size:" in screen_output: try: # Example output: "Physical size: 1080x2340" size_str = screen_output.split("Physical size:")[1].strip() width, height = map(int, size_str.split("x")) except: # If parsing fails, use default values pass # Try different methods to send the message # Method 1: Try using coordinates to tap the send button (usually bottom right) # For MIUI/Xiaomi phones, the send button is typically a green circle in the bottom right send_x = int(width * 0.9) # 90% to the right send_y = int(height * 0.95) # 95% down from top tap_cmd = f"adb shell input tap {send_x} {send_y}" tap_success, tap_output = await run_command(tap_cmd) # Wait for the message to be sent await asyncio.sleep(2) # If tapping specific coordinates didn't work, try the original method if not tap_success: # Original method: Try using arrow keys to navigate to send button # Press right button to focus on send button (keyevent 22) await run_command("adb shell input keyevent 22") # Press enter to send the message (keyevent 66) await run_command("adb shell input keyevent 66") await asyncio.sleep(1) # In case auto-exit doesn't work, press BACK once to exit the messaging app await run_command("adb shell input keyevent 4") return f"Text message sent to {phone_number}" async def receive_text_messages(limit: int = 5) -> str: """Get recent text messages from the phone. Retrieves recent SMS messages from the device's SMS database using ADB and content provider queries to get structured message data. Args: limit (int): Maximum number of messages to retrieve (default: 5) Example: 10 will return the 10 most recent messages Returns: str: JSON string containing messages or an error message: - Success: Formatted JSON string with list of messages, each with fields: - address: Sender's number - body: Message content - date: Timestamp - formatted_date: Human-readable date time (like "2023-07-25 14:30:22") - Failure: Text message describing the error, like "No recent text messages found..." """ # Check for connected device from ..core import check_device_connection connection_status = await check_device_connection() if "ready" not in connection_status: return connection_status # Try the SMS database query directly first, known to sometimes work cmd = "adb shell content query --uri content://sms/inbox --projection address,body,date" success, output = await run_command(cmd) # First, check if the output contains error or help text if "usage:" in output or output.startswith("Error"): # Let's try a direct ADB shell command to get recent SMS content try: # This is a direct shell command that might be more reliable - use double quotes for Windows compatibility shell_cmd = 'adb shell "echo -n \\"MESSAGES_START\\"; content query --uri content://sms/inbox --projection address,body,date; echo -n \\"MESSAGES_END\\""' success, output = await run_command(shell_cmd) if success and "MESSAGES_START" in output and "MESSAGES_END" in output: # Extract just the content between our markers content = ( output.split("MESSAGES_START")[1].split("MESSAGES_END")[0].strip() ) if content and "Row:" in content: # Process the messages messages = [] rows = content.split("Row:") rows = [r for r in rows if r.strip()] for row in rows: message = {} # Split by comma but be careful of commas inside the message content parts = [] current_part = "" for char in row: if char == "," and not current_part.count("=") > 0: parts.append(current_part.strip()) current_part = "" else: current_part += char if current_part: parts.append(current_part.strip()) for part in parts: if "=" in part: key, value = part.split("=", 1) message[key.strip()] = value.strip() # Format the date if present if "date" in message: try: timestamp = int(message["date"]) import datetime date_str = datetime.datetime.fromtimestamp( timestamp / 1000 ).strftime("%Y-%m-%d %H:%M:%S") message["formatted_date"] = date_str except: pass if message: messages.append(message) if messages: messages = messages[:limit] return json.dumps(messages, indent=2) except Exception as e: # If this fails, we have other methods to try pass # Try the most basic approach - since we know it works via direct ADB try: # This directly parses the output we know works with adb shell direct_cmd = "adb shell content query --uri content://sms/inbox --projection address,body,date" success, output = await run_command(direct_cmd) if success and output and "Row:" in output: messages = [] # Parse the output following the known format # Example output: "Row: 0 address=13831151111, body=Meeting at 7pm tonight, date=1744270871178" rows = output.split("Row:") rows = [r for r in rows if r.strip()] for row in rows: # Extract address, body, and date directly with string operations address = "" body = "" date = "" if "address=" in row: address_start = row.find("address=") + 8 address_end = row.find(",", address_start) if address_end > 0: address = row[address_start:address_end].strip() if "body=" in row: body_start = row.find("body=") + 5 body_end = row.find(", date=") if body_end > 0: body = row[body_start:body_end].strip() else: # If date isn't found, body might be the last field body = row[body_start:].strip() if "date=" in row: date_start = row.find("date=") + 5 date_text = row[date_start:].strip() date = date_text if address or body: message = {"address": address, "body": body, "date": date} # Format date if possible if date: try: timestamp = int(date) import datetime date_str = datetime.datetime.fromtimestamp( timestamp / 1000 ).strftime("%Y-%m-%d %H:%M:%S") message["formatted_date"] = date_str except: pass messages.append(message) if messages: messages = messages[:limit] return json.dumps(messages, indent=2) except Exception as e: pass # Continue to next approach # Return a helpful error message with the raw output for debugging return json.dumps( { "error": "Could not retrieve SMS messages", "possible_reasons": [ "No SMS messages in inbox", "Permission issues with SMS access", "Device restrictions", ], "raw_output_sample": output[:200] if output else "No output received", "suggested_fix": "Try sending a test SMS to the device or check permissions", }, indent=2, ) async def get_sent_messages(limit: int = 5) -> str: """Get recently sent text messages from the phone. Retrieves sent SMS messages from the device's SMS database. This provides a complete list of messages that were successfully sent from this device. Args: limit (int): Maximum number of sent messages to retrieve (default: 5) Returns: str: JSON string containing sent messages with: - from: Sender phone number (device owner) - to: Recipient phone number - text: Message content - date: Timestamp - formatted_date: Human-readable date time (like "2023-07-25 14:30:22") """ # Check for connected device from ..core import check_device_connection connection_status = await check_device_connection() if "ready" not in connection_status: return connection_status cmd = "adb shell content query --uri content://sms/sent --projection address,body,date" success, output = await run_command(cmd) if not success or not output or not "Row:" in output: return "Unable to retrieve sent SMS messages. Device may not have any sent messages or may restrict access." # Direct string parsing, avoiding JSON and complex processing result = [] try: rows = output.strip().split("Row:") rows = [r for r in rows if r.strip()] count = 0 for row in rows: if count >= limit: break # Extract components message = {} # Extract address (recipient for sent messages) if "address=" in row: address_start = row.find("address=") + 8 address_end = row.find(",", address_start) if address_end > 0: message["to"] = row[address_start:address_end].strip() # Extract body if "body=" in row: body_start = row.find("body=") + 5 body_end = row.find(", date=") if body_end > 0: message["text"] = row[body_start:body_end].strip() else: # If date isn't found, body might be the last field message["text"] = row[body_start:].strip() # Extract date if "date=" in row: date_start = row.find("date=") + 5 date_str = row[date_start:].strip() message["date"] = date_str # Format date try: timestamp = int(date_str) import datetime message["formatted_date"] = datetime.datetime.fromtimestamp( timestamp / 1000 ).strftime("%Y-%m-%d %H:%M:%S") except: message["formatted_date"] = "Unknown date" if "to" in message and "text" in message: result.append(message) count += 1 if not result: return "No sent SMS messages found." # Format for human-readable output output_lines = [f"Found {len(result)} sent SMS messages:"] for i, msg in enumerate(result, 1): to_line = f"To: {msg.get('to', 'Unknown')}" date_line = f"Date: {msg.get('formatted_date', 'Unknown')}" text_line = msg.get("text", "No content") output_lines.append(f"\n{i}. {to_line} - {date_line}") output_lines.append(f" {text_line}") return "\n".join(output_lines) except Exception as e: return f"Error parsing sent SMS messages: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hao-cyber/phone-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server