mac-messages-mcp

  • mac_messages_mcp
""" Core functionality for interacting with macOS Messages app """ import os import re import sqlite3 import subprocess import json import time import difflib from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict, Any, Tuple import glob def run_applescript(script: str) -> str: """Run an AppleScript and return the result.""" proc = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() if proc.returncode != 0: return f"Error: {err.decode('utf-8')}" return out.decode('utf-8').strip() def get_chat_mapping() -> Dict[str, str]: """ Get mapping from room_name to display_name in chat table """ conn = sqlite3.connect(get_messages_db_path()) cursor = conn.cursor() cursor.execute("SELECT room_name, display_name FROM chat") result_set = cursor.fetchall() mapping = {room_name: display_name for room_name, display_name in result_set} conn.close() return mapping def extract_body_from_attributed(attributed_body): """ Extract message content from attributedBody binary data """ if attributed_body is None: return None try: # Try to decode attributedBody decoded = attributed_body.decode('utf-8', errors='replace') # Extract content using pattern matching if "NSNumber" in decoded: decoded = decoded.split("NSNumber")[0] if "NSString" in decoded: decoded = decoded.split("NSString")[1] if "NSDictionary" in decoded: decoded = decoded.split("NSDictionary")[0] decoded = decoded[6:-12] return decoded except Exception as e: print(f"Error extracting from attributedBody: {e}") return None def get_messages_db_path() -> str: """Get the path to the Messages database.""" home_dir = os.path.expanduser("~") return os.path.join(home_dir, "Library/Messages/chat.db") def query_messages_db(query: str, params: tuple = ()) -> List[Dict[str, Any]]: """Query the Messages database and return results as a list of dictionaries.""" try: db_path = get_messages_db_path() # Check if the database file exists and is accessible if not os.path.exists(db_path): return [{"error": f"Messages database not found at {db_path}"}] # Try to connect to the database try: conn = sqlite3.connect(db_path) except sqlite3.OperationalError as e: return [{"error": f"Cannot access Messages database. Please grant Full Disk Access permission to your terminal application in System Preferences > Security & Privacy > Privacy > Full Disk Access. Error: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE."}] conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) results = [dict(row) for row in cursor.fetchall()] conn.close() return results except Exception as e: return [{"error": str(e)}] def normalize_phone_number(phone: str) -> str: """ Normalize a phone number by removing all non-digit characters. """ if not phone: return "" return ''.join(c for c in phone if c.isdigit()) # Global cache for contacts map _CONTACTS_CACHE = None _LAST_CACHE_UPDATE = 0 _CACHE_TTL = 300 # 5 minutes in seconds def clean_name(name: str) -> str: """ Clean a name by removing emojis and extra whitespace. """ # Remove emoji and other non-alphanumeric characters except spaces, hyphens, and apostrophes emoji_pattern = re.compile( "[" "\U0001F600-\U0001F64F" # emoticons "\U0001F300-\U0001F5FF" # symbols & pictographs "\U0001F680-\U0001F6FF" # transport & map symbols "\U0001F700-\U0001F77F" # alchemical symbols "\U0001F780-\U0001F7FF" # Geometric Shapes "\U0001F800-\U0001F8FF" # Supplemental Arrows-C "\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs "\U0001FA00-\U0001FA6F" # Chess Symbols "\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A "\U00002702-\U000027B0" # Dingbats "\U000024C2-\U0001F251" "]+" ) name = emoji_pattern.sub(r'', name) # Keep alphanumeric, spaces, apostrophes, and hyphens name = re.sub(r'[^\w\s\'\-]', '', name, flags=re.UNICODE) # Remove extra whitespace name = re.sub(r'\s+', ' ', name).strip() return name def fuzzy_match(query: str, candidates: List[Tuple[str, Any]], threshold: float = 0.6) -> List[Tuple[str, Any, float]]: """ Find fuzzy matches between query and a list of candidates. Args: query: The search string candidates: List of (name, value) tuples to search through threshold: Minimum similarity score (0-1) to consider a match Returns: List of (name, value, score) tuples for matches, sorted by score """ query = clean_name(query).lower() results = [] for name, value in candidates: clean_candidate = clean_name(name).lower() # Try exact match first (case insensitive) if query == clean_candidate: results.append((name, value, 1.0)) continue # Check if query is a substring of the candidate if query in clean_candidate: # Longer substring matches get higher scores score = len(query) / len(clean_candidate) * 0.9 # max 0.9 for substring if score >= threshold: results.append((name, value, score)) continue # Otherwise use difflib for fuzzy matching score = difflib.SequenceMatcher(None, query, clean_candidate).ratio() if score >= threshold: results.append((name, value, score)) # Sort results by score (highest first) return sorted(results, key=lambda x: x[2], reverse=True) def query_addressbook_db(query: str, params: tuple = ()) -> List[Dict[str, Any]]: """Query the AddressBook database and return results as a list of dictionaries.""" try: # Find the AddressBook database paths home_dir = os.path.expanduser("~") sources_path = os.path.join(home_dir, "Library/Application Support/AddressBook/Sources/*/AddressBook-v22.abcddb") db_paths = glob.glob(sources_path) if not db_paths: return [{"error": f"AddressBook database not found at {sources_path} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE."}] # Try each database path until one works all_results = [] for db_path in db_paths: try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) results = [dict(row) for row in cursor.fetchall()] conn.close() all_results.extend(results) except sqlite3.OperationalError as e: # If we can't access this one, try the next database print(f"Warning: Cannot access {db_path}: {str(e)}") continue if not all_results and len(db_paths) > 0: return [{"error": f"Could not access any AddressBook databases. Please grant Full Disk Access permission. PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE."}] return all_results except Exception as e: return [{"error": str(e)}] def get_addressbook_contacts() -> Dict[str, str]: """ Query the macOS AddressBook database to get contacts and their phone numbers. Returns a dictionary mapping normalized phone numbers to contact names. """ contacts_map = {} # Define the query to get contact names and phone numbers query = """ SELECT ZABCDRECORD.ZFIRSTNAME as first_name, ZABCDRECORD.ZLASTNAME as last_name, ZABCDPHONENUMBER.ZFULLNUMBER as phone FROM ZABCDRECORD LEFT JOIN ZABCDPHONENUMBER ON ZABCDRECORD.Z_PK = ZABCDPHONENUMBER.ZOWNER WHERE ZABCDPHONENUMBER.ZFULLNUMBER IS NOT NULL ORDER BY ZABCDRECORD.ZLASTNAME, ZABCDRECORD.ZFIRSTNAME, ZABCDPHONENUMBER.ZORDERINGINDEX ASC """ try: # For testing/fallback, parse the user-provided examples in cases where direct DB access fails # This is a temporary workaround until full disk access is granted if 'USE_TEST_DATA' in os.environ and os.environ['USE_TEST_DATA'].lower() == 'true': contacts = [ {"first_name":"TEST", "last_name":"TEST", "phone":"+11111111111"} ] return process_contacts(contacts) # Try to query database directly results = query_addressbook_db(query) if results and "error" in results[0]: print(f"Error getting AddressBook contacts: {results[0]['error']}") # Fall back to subprocess method if direct DB access fails return get_addressbook_contacts_subprocess() return process_contacts(results) except Exception as e: print(f"Error getting AddressBook contacts: {str(e)}") return {} def process_contacts(contacts) -> Dict[str, str]: """Process contact records into a normalized phone -> name map""" contacts_map = {} name_to_numbers = {} # For reverse lookup for contact in contacts: try: first_name = contact.get("first_name", "") last_name = contact.get("last_name", "") phone = contact.get("phone", "") # Skip entries without phone numbers if not phone: continue # Clean up phone number and remove any image metadata if "X-IMAGETYPE" in phone: phone = phone.split("X-IMAGETYPE")[0] # Create full name full_name = " ".join(filter(None, [first_name, last_name])) if not full_name.strip(): continue # Normalize phone number and add to map normalized_phone = normalize_phone_number(phone) if normalized_phone: contacts_map[normalized_phone] = full_name # Add to reverse lookup if full_name not in name_to_numbers: name_to_numbers[full_name] = [] name_to_numbers[full_name].append(normalized_phone) except Exception as e: # Skip individual entries that fail to process print(f"Error processing contact: {str(e)}") continue # Store the reverse lookup in a global variable for later use global _NAME_TO_NUMBERS_MAP _NAME_TO_NUMBERS_MAP = name_to_numbers return contacts_map def get_addressbook_contacts_subprocess() -> Dict[str, str]: """ Legacy method to get contacts using subprocess. Only used as fallback when direct database access fails. """ contacts_map = {} try: # Form the SQL query to execute via command line cmd = """ sqlite3 ~/Library/"Application Support"/AddressBook/Sources/*/AddressBook-v22.abcddb<<EOF .mode json SELECT DISTINCT ZABCDRECORD.ZFIRSTNAME [FIRST NAME], ZABCDRECORD.ZLASTNAME [LAST NAME], ZABCDPHONENUMBER.ZFULLNUMBER [FULL NUMBER] FROM ZABCDRECORD LEFT JOIN ZABCDPHONENUMBER ON ZABCDRECORD.Z_PK = ZABCDPHONENUMBER.ZOWNER ORDER BY ZABCDRECORD.ZLASTNAME, ZABCDRECORD.ZFIRSTNAME, ZABCDPHONENUMBER.ZORDERINGINDEX ASC; EOF """ # Execute the command result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: # Parse the JSON output line by line (it's a series of JSON objects) for line in result.stdout.strip().split('\n'): if not line.strip(): continue # Remove trailing commas that might cause JSON parsing errors line = line.rstrip(',') try: contact = json.loads(line) first_name = contact.get("FIRST NAME", "") last_name = contact.get("LAST NAME", "") phone = contact.get("FULL NUMBER", "") # Process contact as in the main method if not phone: continue if "X-IMAGETYPE" in phone: phone = phone.split("X-IMAGETYPE")[0] full_name = " ".join(filter(None, [first_name, last_name])) if not full_name.strip(): continue normalized_phone = normalize_phone_number(phone) if normalized_phone: contacts_map[normalized_phone] = full_name except json.JSONDecodeError: # Skip individual lines that fail to parse continue except Exception as e: print(f"Error getting AddressBook contacts via subprocess: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") return contacts_map # Global variable for reverse contact lookup _NAME_TO_NUMBERS_MAP = {} def get_cached_contacts() -> Dict[str, str]: """Get cached contacts map or refresh if needed""" global _CONTACTS_CACHE, _LAST_CACHE_UPDATE current_time = time.time() if _CONTACTS_CACHE is None or (current_time - _LAST_CACHE_UPDATE) > _CACHE_TTL: _CONTACTS_CACHE = get_addressbook_contacts() _LAST_CACHE_UPDATE = current_time return _CONTACTS_CACHE def find_contact_by_name(name: str) -> List[Dict[str, Any]]: """ Find contacts by name using fuzzy matching. Args: name: The name to search for Returns: List of matching contacts (may be multiple if ambiguous) """ contacts = get_cached_contacts() # Build a list of (name, phone) pairs to search through candidates = [(contact_name, phone) for phone, contact_name in contacts.items()] # Perform fuzzy matching matches = fuzzy_match(name, candidates) # Convert to a list of contact dictionaries results = [] for contact_name, phone, score in matches: results.append({ "name": contact_name, "phone": phone, "score": score }) return results def send_message(recipient: str, message: str, group_chat: bool = False) -> str: """ Send a message using the Messages app with improved contact resolution. Args: recipient: Phone number, email, contact name, or special format for contact selection Use "contact:N" to select the Nth contact from a previous ambiguous match message: Message text to send group_chat: Whether this is a group chat (uses chat ID instead of buddy) Returns: Success or error message """ # Convert to string to ensure phone numbers work properly recipient = str(recipient).strip() # Handle contact selection format (contact:N) if recipient.lower().startswith("contact:"): try: # Get the selected index (1-based) index = int(recipient.split(":", 1)[1].strip()) - 1 # Get the most recent contact matches from global cache if not hasattr(send_message, "recent_matches") or not send_message.recent_matches: return "No recent contact matches available. Please search for a contact first." if index < 0 or index >= len(send_message.recent_matches): return f"Invalid selection. Please choose a number between 1 and {len(send_message.recent_matches)}." # Get the selected contact contact = send_message.recent_matches[index] return _send_message_to_recipient(contact['phone'], message, contact['name'], group_chat) except (ValueError, IndexError) as e: return f"Error selecting contact: {str(e)}" # Check if recipient is directly a phone number if all(c.isdigit() or c in '+- ()' for c in recipient): # Clean the phone number clean_number = ''.join(c for c in recipient if c.isdigit()) return _send_message_to_recipient(clean_number, message, group_chat=group_chat) # Try to find the contact by name contacts = find_contact_by_name(recipient) if not contacts: return f"Error: Could not find any contact matching '{recipient}'" if len(contacts) == 1: # Single match, use it contact = contacts[0] return _send_message_to_recipient(contact['phone'], message, contact['name'], group_chat) else: # Store the matches for later selection send_message.recent_matches = contacts # Multiple matches, return them all contact_list = "\n".join([f"{i+1}. {c['name']} ({c['phone']})" for i, c in enumerate(contacts[:10])]) return f"Multiple contacts found matching '{recipient}'. Please specify which one using 'contact:N' where N is the number:\n{contact_list}" # Initialize the static variable for recent matches send_message.recent_matches = [] def _send_message_to_recipient(recipient: str, message: str, contact_name: str = None, group_chat: bool = False) -> str: """ Internal function to send a message to a specific recipient using file-based approach. Args: recipient: Phone number or email message: Message text to send contact_name: Optional contact name for the success message group_chat: Whether this is a group chat Returns: Success or error message """ try: # Create a temporary file with the message content file_path = os.path.abspath('imessage_tmp.txt') with open(file_path, 'w') as f: f.write(message) # Adjust the AppleScript command based on whether this is a group chat if not group_chat: command = f'tell application "Messages" to send (read (POSIX file "{file_path}") as «class utf8») to buddy "{recipient}"' else: command = f'tell application "Messages" to send (read (POSIX file "{file_path}") as «class utf8») to chat "{recipient}"' # Run the AppleScript result = run_applescript(command) # Clean up the temporary file try: os.remove(file_path) except: pass # Check result if result.startswith("Error:"): # Try fallback to direct method return _send_message_direct(recipient, message, contact_name, group_chat) # Message sent successfully display_name = contact_name if contact_name else recipient return f"Message sent successfully to {display_name}" except Exception as e: # Try fallback method return _send_message_direct(recipient, message, contact_name, group_chat) def get_contact_name(handle_id: int) -> str: """ Get contact name from handle_id with improved contact lookup. """ if handle_id is None: return "Unknown" # First, get the phone number or email handle_query = """ SELECT id FROM handle WHERE ROWID = ? """ handles = query_messages_db(handle_query, (handle_id,)) if not handles or "error" in handles[0]: return "Unknown" handle_id_value = handles[0]["id"] # Try to match with AddressBook contacts contacts = get_cached_contacts() normalized_handle = normalize_phone_number(handle_id_value) # Try different variations of the number for matching if normalized_handle in contacts: return contacts[normalized_handle] # Sometimes numbers in the addressbook have the country code, but messages don't if normalized_handle.startswith('1') and len(normalized_handle) > 10: # Try without country code if normalized_handle[1:] in contacts: return contacts[normalized_handle[1:]] elif len(normalized_handle) == 10: # US number without country code # Try with country code if '1' + normalized_handle in contacts: return contacts['1' + normalized_handle] # If no match found in AddressBook, fall back to display name from chat contact_query = """ SELECT c.display_name FROM handle h JOIN chat_handle_join chj ON h.ROWID = chj.handle_id JOIN chat c ON chj.chat_id = c.ROWID WHERE h.id = ? LIMIT 1 """ contacts = query_messages_db(contact_query, (handle_id_value,)) if contacts and len(contacts) > 0 and "display_name" in contacts[0] and contacts[0]["display_name"]: return contacts[0]["display_name"] # If no contact name found, return the phone number or email return handle_id_value def get_recent_messages(hours: int = 24, contact: Optional[str] = None) -> str: """ Get recent messages from the Messages app using attributedBody for content. Args: hours: Number of hours to look back (default: 24) contact: Filter by contact name, phone number, or email (optional) Use "contact:N" to select a specific contact from previous matches Returns: Formatted string with recent messages """ handle_id = None # If contact is specified, try to resolve it if contact: # Convert to string to ensure phone numbers work properly contact = str(contact).strip() # Handle contact selection format (contact:N) if contact.lower().startswith("contact:"): try: # Get the selected index (1-based) index = int(contact.split(":", 1)[1].strip()) - 1 # Get the most recent contact matches from global cache if not hasattr(get_recent_messages, "recent_matches") or not get_recent_messages.recent_matches: return "No recent contact matches available. Please search for a contact first." if index < 0 or index >= len(get_recent_messages.recent_matches): return f"Invalid selection. Please choose a number between 1 and {len(get_recent_messages.recent_matches)}." # Get the selected contact's phone number contact = get_recent_messages.recent_matches[index]['phone'] except (ValueError, IndexError) as e: return f"Error selecting contact: {str(e)}" # Check if contact might be a name rather than a phone number or email if not all(c.isdigit() or c in '+- ()@.' for c in contact): # Try fuzzy matching matches = find_contact_by_name(contact) if not matches: return f"No contacts found matching '{contact}'." if len(matches) == 1: # Single match, use its phone number contact = matches[0]['phone'] else: # Store the matches for later selection get_recent_messages.recent_matches = matches # Multiple matches, return them all contact_list = "\n".join([f"{i+1}. {c['name']} ({c['phone']})" for i, c in enumerate(matches[:10])]) return f"Multiple contacts found matching '{contact}'. Please specify which one using 'contact:N' where N is the number:\n{contact_list}" # At this point, contact should be a phone number or email # Try to find handle_id with improved phone number matching if '@' in contact: # This is an email query = "SELECT ROWID FROM handle WHERE id = ?" results = query_messages_db(query, (contact,)) if results and not "error" in results[0] and len(results) > 0: handle_id = results[0]["ROWID"] else: # This is a phone number - try various formats handle_id = find_handle_by_phone(contact) if not handle_id: # Try a direct search in message table to see if any messages exist normalized = normalize_phone_number(contact) query = """ SELECT COUNT(*) as count FROM message m JOIN handle h ON m.handle_id = h.ROWID WHERE h.id LIKE ? """ results = query_messages_db(query, (f"%{normalized}%",)) if results and not "error" in results[0] and results[0].get("count", 0) == 0: # No messages found but the query was valid return f"No message history found with '{contact}'." else: # Could not find the handle at all return f"Could not find any messages with contact '{contact}'. Verify the phone number or email is correct." # Calculate the timestamp for X hours ago current_time = datetime.now(timezone.utc) hours_ago = current_time - timedelta(hours=hours) # Convert to Apple's timestamp format (seconds since 2001-01-01) apple_epoch = datetime(2001, 1, 1, tzinfo=timezone.utc) seconds_since_apple_epoch = int((hours_ago - apple_epoch).total_seconds()) # Make sure we're using a string representation for the timestamp # to avoid integer overflow issues when binding to SQLite timestamp_str = str(seconds_since_apple_epoch) # Build the SQL query - use attributedBody field and text query = """ SELECT m.ROWID, m.date, m.text, m.attributedBody, m.is_from_me, m.handle_id, m.cache_roomnames FROM message m WHERE CAST(m.date AS TEXT) > ? """ params = (timestamp_str,) # Add contact filter if handle_id was found if handle_id: query += "AND m.handle_id = ? " params = (timestamp_str, handle_id) query += "ORDER BY m.date DESC LIMIT 100" # Execute the query messages = query_messages_db(query, params) # Format the results if not messages: return "No messages found in the specified time period." if "error" in messages[0]: return f"Error accessing messages: {messages[0]['error']}" # Get chat mapping for group chat names chat_mapping = get_chat_mapping() formatted_messages = [] for msg in messages: # Get the message content from text or attributedBody if msg.get('text'): body = msg['text'] elif msg.get('attributedBody'): body = extract_body_from_attributed(msg['attributedBody']) if not body: # Skip messages with no content continue else: # Skip empty messages continue # Convert Apple timestamp to readable date try: # Convert Apple timestamp to datetime date_string = '2001-01-01' mod_date = datetime.strptime(date_string, '%Y-%m-%d') unix_timestamp = int(mod_date.timestamp()) * 1000000000 # Handle both nanosecond and second format timestamps msg_timestamp = int(msg["date"]) if len(str(msg_timestamp)) > 10: # It's in nanoseconds new_date = int((msg_timestamp + unix_timestamp) / 1000000000) else: # It's already in seconds new_date = mod_date.timestamp() + msg_timestamp date_str = datetime.fromtimestamp(new_date).strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError, OverflowError) as e: # If conversion fails, use a placeholder date_str = "Unknown date" print(f"Date conversion error: {e} for timestamp {msg['date']}") direction = "You" if msg["is_from_me"] else get_contact_name(msg["handle_id"]) # Check if this is a group chat group_chat_name = None if msg.get('cache_roomnames'): group_chat_name = chat_mapping.get(msg['cache_roomnames']) message_prefix = f"[{date_str}]" if group_chat_name: message_prefix += f" [{group_chat_name}]" formatted_messages.append( f"{message_prefix} {direction}: {body}" ) if not formatted_messages: return "No messages found in the specified time period." return "\n".join(formatted_messages) # Initialize the static variable for recent matches get_recent_messages.recent_matches = [] def _send_message_direct(recipient: str, message: str, contact_name: str = None, group_chat: bool = False) -> str: """ Fallback direct AppleScript method for sending messages. """ # Clean the inputs for AppleScript safe_message = message.replace('"', '\\"').replace('\\', '\\\\') safe_recipient = recipient.replace('"', '\\"') # Different script based on group_chat flag if not group_chat: script = f''' tell application "Messages" set targetService to 1st service whose service type = iMessage try -- Try to get the existing buddy if possible set targetBuddy to buddy "{safe_recipient}" of targetService -- Send the message send "{safe_message}" to targetBuddy -- Wait briefly to check for immediate errors delay 1 -- Return success return "success" on error errMsg -- If getting buddy fails, try to create a new conversation try set newMessage to send "{safe_message}" to "{safe_recipient}" return "success" on error errMsg2 -- Both methods failed return "error:" & errMsg2 end try end try end tell ''' else: script = f''' tell application "Messages" try -- Try to get the existing chat set targetChat to chat "{safe_recipient}" -- Send the message send "{safe_message}" to targetChat -- Wait briefly to check for immediate errors delay 1 -- Return success return "success" on error errMsg -- Chat method failed return "error:" & errMsg end try end tell ''' try: result = run_applescript(script) if result.startswith("error:"): return f"Error sending message: {result[6:]}" elif result.strip() == "success": display_name = contact_name if contact_name else recipient return f"Message sent successfully to {display_name}" else: return f"Unknown result: {result}" except Exception as e: return f"Error sending message: {str(e)}" def check_messages_db_access() -> str: """Check if the Messages database is accessible and return detailed information.""" try: db_path = get_messages_db_path() status = [] # Check if the file exists if not os.path.exists(db_path): return f"ERROR: Messages database not found at {db_path} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." status.append(f"Database file exists at: {db_path}") # Check file permissions try: with open(db_path, 'rb') as f: # Just try to read a byte to confirm access f.read(1) status.append("File is readable") except PermissionError: return f"ERROR: Permission denied when trying to read {db_path}. Please grant Full Disk Access permission to your terminal application. PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." except Exception as e: return f"ERROR: Unknown error reading file: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." # Try to connect to the database try: conn = sqlite3.connect(db_path) status.append("Successfully connected to database") # Test a simple query cursor = conn.cursor() cursor.execute("SELECT count(*) FROM sqlite_master") count = cursor.fetchone()[0] status.append(f"Database contains {count} tables") # Check if the necessary tables exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('message', 'handle', 'chat')") tables = [row[0] for row in cursor.fetchall()] if 'message' in tables and 'handle' in tables: status.append("Required tables (message, handle) are present") else: status.append(f"WARNING: Some required tables are missing. Found: {', '.join(tables)}") conn.close() except sqlite3.OperationalError as e: return f"ERROR: Database connection error: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." return "\n".join(status) except Exception as e: return f"ERROR: Unexpected error during database access check: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." def find_handle_by_phone(phone: str) -> Optional[int]: """ Find a handle ID by phone number, trying various formats. Args: phone: Phone number in any format Returns: handle_id if found, None otherwise """ # Normalize the phone number (remove all non-digit characters) normalized = normalize_phone_number(phone) if not normalized: return None # Try various formats for US numbers formats_to_try = [normalized] # Start with the normalized input # For US numbers, try with and without country code if normalized.startswith('1') and len(normalized) > 10: # Try without the country code formats_to_try.append(normalized[1:]) elif len(normalized) == 10: # Try with the country code formats_to_try.append('1' + normalized) # Query for the handle ID using OR conditions placeholders = ', '.join(['?' for _ in formats_to_try]) query = f""" SELECT ROWID FROM handle WHERE id IN ({placeholders}) OR id IN ({placeholders}) """ # Create parameters list with both the raw formats and with "+" prefix params = formats_to_try + ['+' + f for f in formats_to_try] results = query_messages_db(query, tuple(params)) if not results or "error" in results[0]: return None if len(results) == 0: return None return results[0]["ROWID"] def check_addressbook_access() -> str: """Check if the AddressBook database is accessible and return detailed information.""" try: home_dir = os.path.expanduser("~") sources_path = os.path.join(home_dir, "Library/Application Support/AddressBook/Sources") status = [] # Check if the directory exists if not os.path.exists(sources_path): return f"ERROR: AddressBook Sources directory not found at {sources_path} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." status.append(f"AddressBook Sources directory exists at: {sources_path}") # Find database files db_paths = glob.glob(os.path.join(sources_path, "*/AddressBook-v22.abcddb")) if not db_paths: return f"ERROR: No AddressBook database files found in {sources_path} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE." status.append(f"Found {len(db_paths)} AddressBook database files:") for path in db_paths: status.append(f" - {path}") # Check file permissions for each database for db_path in db_paths: try: with open(db_path, 'rb') as f: # Just try to read a byte to confirm access f.read(1) status.append(f"File is readable: {db_path}") except PermissionError: status.append(f"ERROR: Permission denied when trying to read {db_path} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") continue except Exception as e: status.append(f"ERROR: Unknown error reading file {db_path}: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") continue # Try to connect to the database try: conn = sqlite3.connect(db_path) status.append(f"Successfully connected to database: {db_path}") # Test a simple query cursor = conn.cursor() cursor.execute("SELECT count(*) FROM sqlite_master") count = cursor.fetchone()[0] status.append(f"Database contains {count} tables") # Check if the necessary tables exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('ZABCDRECORD', 'ZABCDPHONENUMBER')") tables = [row[0] for row in cursor.fetchall()] if 'ZABCDRECORD' in tables and 'ZABCDPHONENUMBER' in tables: status.append("Required tables (ZABCDRECORD, ZABCDPHONENUMBER) are present") else: status.append(f"WARNING: Some required tables are missing. Found: {', '.join(tables)}") # Get a count of contacts try: cursor.execute("SELECT COUNT(*) FROM ZABCDRECORD") contact_count = cursor.fetchone()[0] status.append(f"Database contains {contact_count} contacts") except sqlite3.OperationalError: status.append("Could not query contact count PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") conn.close() except sqlite3.OperationalError as e: status.append(f"ERROR: Database connection error for {db_path}: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") # Try to get actual contacts contacts = get_addressbook_contacts() if contacts: status.append(f"Successfully retrieved {len(contacts)} contacts with phone numbers") else: status.append("WARNING: No contacts with phone numbers found. PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE.") return "\n".join(status) except Exception as e: return f"ERROR: Unexpected error during database access check: {str(e)} PLEASE TELL THE USER TO GRANT FULL DISK ACCESS TO THE TERMINAL APPLICATION(CURSOR, TERMINAL, CLAUDE, ETC.) AND RESTART THE APPLICATION. DO NOT RETRY UNTIL NEXT MESSAGE."