Skip to main content
Glama
wolfiesch

iMessage MCP Server

by wolfiesch
messages_interface.py22.8 kB
""" macOS Messages integration for iMessage MCP server. Provides interface to send messages via AppleScript and read message history from the Messages database (chat.db). Sprint 1: Basic AppleScript sending Sprint 1.5: Message history reading with attributedBody parsing (macOS Ventura+) """ import subprocess import sqlite3 import logging import plistlib import re from pathlib import Path from typing import Optional, List, Dict from datetime import datetime logger = logging.getLogger(__name__) def parse_attributed_body(blob: bytes) -> Optional[str]: """ Parse the attributedBody column from macOS Messages database. macOS Ventura+ stores message content as a binary plist containing an NSKeyedArchiver-encoded NSAttributedString. Args: blob: Raw bytes from the attributedBody column Returns: Extracted text string or None if parsing fails """ if not blob: return None try: # attributedBody can be either: # 1. bplist (NSKeyedArchiver) format - newer # 2. streamtyped format - used in macOS Messages # This function only handles bplist format bplist_start = blob.find(b'bplist') if bplist_start == -1: # Not a bplist format - let caller try other methods return None # Extract the bplist portion plist_data = blob[bplist_start:] # Parse the binary plist plist = plistlib.loads(plist_data) # NSKeyedArchiver format has $objects array containing the data if isinstance(plist, dict) and '$objects' in plist: objects = plist['$objects'] # The text is usually stored as an NSString in the objects array # Look for the longest string that isn't a class name text_candidates = [] for obj in objects: if isinstance(obj, str): # Skip class names and metadata if not obj.startswith('NS') and not obj.startswith('$') and len(obj) > 0: text_candidates.append(obj) elif isinstance(obj, dict): # Sometimes text is in NS.string key if 'NS.string' in obj: text_candidates.append(obj['NS.string']) elif 'NS.bytes' in obj: # Try to decode bytes try: text_candidates.append(obj['NS.bytes'].decode('utf-8')) except: pass # Return the first substantial text found for text in text_candidates: if text and len(text.strip()) > 0: return text.strip() # Fallback: try to find readable text in plist if isinstance(plist, dict): for key, value in plist.items(): if isinstance(value, str) and len(value) > 0 and not value.startswith('NS'): return value return None except Exception as e: logger.debug(f"Failed to parse attributedBody: {e}") return None def extract_text_from_blob(blob: bytes) -> Optional[str]: """ Extract readable text from a binary blob (attributedBody format). macOS Messages uses a "streamtyped" format where: - Header: streamtyped + class hierarchy - After "NSString" marker: 5 control bytes + length byte + actual text - Text ends before control sequences (0x86, 0x84, etc.) Args: blob: Raw bytes from attributedBody column Returns: Extracted text or None """ if not blob: return None # Method 1: Try parsing as bplist (NSKeyedArchiver format) result = parse_attributed_body(blob) if result: return result # Method 2: Parse streamtyped format (macOS Messages format) try: # Find the NSString marker followed by control bytes and + # Pattern: NSString + 5 bytes including \x01\x94\x84\x01+ + length_byte + text nsstring_idx = blob.find(b'NSString') if nsstring_idx != -1: # Look for the '+' marker which precedes the text plus_idx = blob.find(b'+', nsstring_idx) if plus_idx != -1 and plus_idx < nsstring_idx + 20: # Skip the '+' and the length byte text_start = plus_idx + 2 # Find where the text ends (before control chars like \x86\x84) text_end = text_start while text_end < len(blob): byte = blob[text_end] # Stop at control sequences (0x86, 0x84 commonly end the text) if byte in (0x86, 0x84, 0x00): break text_end += 1 if text_end > text_start: text_bytes = blob[text_start:text_end] try: text = text_bytes.decode('utf-8') if text.strip(): return text.strip() except UnicodeDecodeError: # Try with error handling text = text_bytes.decode('utf-8', errors='ignore') if text.strip(): return text.strip() # Method 3: Try NSMutableString (for reactions/edits) nsmutstring_idx = blob.find(b'NSMutableString') if nsmutstring_idx == -1: nsmutstring_idx = blob.find(b'NSString') if nsmutstring_idx != -1: # Similar pattern for mutable strings plus_idx = blob.find(b'+', nsmutstring_idx) if plus_idx != -1: text_start = plus_idx + 2 text_end = text_start while text_end < len(blob) and blob[text_end] not in (0x86, 0x84, 0x00, 0x69): # 0x69 = 'i' which often marks end if text_end + 1 < len(blob) and blob[text_end] == ord('i') and blob[text_end + 1] in (0x49, 0x4e): break text_end += 1 if text_end > text_start: text_bytes = blob[text_start:text_end] text = text_bytes.decode('utf-8', errors='ignore') if text.strip(): return text.strip() # Method 4: Fallback - find longest readable sequence text = blob.decode('utf-8', errors='ignore') # Find substantial printable runs that aren't class names printable_runs = re.findall(r'[^\x00-\x1f\x7f-\x9f]{3,}', text) for run in printable_runs: # Skip metadata strings skip_patterns = ['NSString', 'NSObject', 'NSMutable', 'NSDictionary', 'NSAttributed', 'streamtyped', '__kIM', 'NSNumber', 'NSValue'] if not any(p in run for p in skip_patterns): # Clean up any remaining artifacts cleaned = run.strip('+').strip() if cleaned and len(cleaned) >= 2: return cleaned except Exception as e: logger.debug(f"Error extracting text from blob: {e}") return None class MessagesInterface: """Interface to macOS Messages app.""" def __init__(self, messages_db_path: str = "~/Library/Messages/chat.db"): """ Initialize Messages interface. Args: messages_db_path: Path to Messages database (default: standard location) """ self.messages_db_path = Path(messages_db_path).expanduser() logger.info(f"Initialized MessagesInterface with DB: {self.messages_db_path}") def send_message(self, phone: str, message: str) -> dict: """ Send an iMessage using AppleScript. Args: phone: Phone number or iMessage handle (email) message: Message text to send Returns: dict: {"success": bool, "error": Optional[str]} Example: result = interface.send_message("+14155551234", "Hello!") """ logger.info(f"Sending message to {phone}") # Escape quotes and special characters for AppleScript escaped_message = message.replace('"', '\\"').replace('\\', '\\\\') escaped_phone = phone.replace('"', '\\"') # AppleScript to send message # Uses Messages.app automation script = f''' tell application "Messages" set targetService to 1st account whose service type = iMessage set targetBuddy to participant "{escaped_phone}" of targetService send "{escaped_message}" to targetBuddy end tell ''' try: # Execute AppleScript result = subprocess.run( ['osascript', '-e', script], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: logger.info(f"Message sent successfully to {phone}") return {"success": True, "error": None} else: error_msg = result.stderr.strip() logger.error(f"Failed to send message: {error_msg}") return {"success": False, "error": error_msg} except subprocess.TimeoutExpired: logger.error("AppleScript timeout - Messages.app may not be running") return { "success": False, "error": "Timeout - ensure Messages.app is running" } except Exception as e: logger.error(f"Exception sending message: {e}") return {"success": False, "error": str(e)} def get_recent_messages( self, phone: str, limit: int = 20 ) -> List[Dict]: """ Retrieve recent messages with a contact from Messages database. Args: phone: Phone number or iMessage handle limit: Number of recent messages to retrieve Returns: List[Dict]: List of message dicts with keys: - text: Message content - date: Timestamp - is_from_me: Boolean (sent vs received) Note: Requires Full Disk Access permission for ~/Library/Messages/chat.db Includes attributedBody parsing for macOS Ventura+ """ logger.info(f"Retrieving recent messages for {phone}") if not self.messages_db_path.exists(): logger.error(f"Messages database not found: {self.messages_db_path}") return [] try: # Connect to Messages database (read-only) conn = sqlite3.connect(f"file:{self.messages_db_path}?mode=ro", uri=True) cursor = conn.cursor() # Query messages for this contact # Include attributedBody for macOS Ventura+ message parsing query = """ SELECT message.text, message.attributedBody, message.date, message.is_from_me, message.cache_roomnames FROM message JOIN handle ON message.handle_id = handle.ROWID WHERE handle.id LIKE ? ORDER BY message.date DESC LIMIT ? """ # macOS Messages uses time since 2001-01-01 (Cocoa reference date) cursor.execute(query, (f"%{phone}%", limit)) rows = cursor.fetchall() messages = [] for row in rows: text, attributed_body, date_cocoa, is_from_me, _ = row # Try to get text content: # 1. Use text column if available (older messages) # 2. Parse attributedBody for macOS Ventura+ messages message_text = text if not message_text and attributed_body: message_text = extract_text_from_blob(attributed_body) # Convert Cocoa timestamp to Python datetime # Cocoa epoch: 2001-01-01 00:00:00 UTC if date_cocoa: cocoa_epoch = datetime(2001, 1, 1) date = cocoa_epoch + timedelta(seconds=date_cocoa / 1_000_000_000) else: date = None messages.append({ "text": message_text or "[message content not available]", "date": date.isoformat() if date else None, "is_from_me": bool(is_from_me) }) conn.close() logger.info(f"Retrieved {len(messages)} messages") return messages except sqlite3.Error as e: logger.error(f"Database error: {e}") return [] except Exception as e: logger.error(f"Error retrieving messages: {e}") return [] def check_permissions(self) -> dict: """ Check if required permissions are granted. Returns: dict: {"messages_db_accessible": bool, "applescript_ready": bool} """ permissions = { "messages_db_accessible": self.messages_db_path.exists(), "applescript_ready": True # Will fail on first send if not granted } if not permissions["messages_db_accessible"]: logger.warning( "Messages database not accessible. " "Grant Full Disk Access: System Settings → Privacy & Security" ) return permissions def get_all_recent_conversations(self, limit: int = 20) -> List[Dict]: """ Get recent messages from ALL conversations (not filtered by contact). Sprint 2.5: Returns recent messages across all contacts, including unknown numbers and people not in your contacts. Args: limit: Number of recent messages to retrieve Returns: List[Dict]: List of message dicts with keys: - text: Message content - date: Timestamp - is_from_me: Boolean (sent vs received) - phone: Phone number or handle of sender/recipient - contact_name: Contact name if available, otherwise phone/handle Example: messages = interface.get_all_recent_conversations(limit=50) """ logger.info(f"Retrieving {limit} most recent messages from all conversations") if not self.messages_db_path.exists(): logger.error(f"Messages database not found: {self.messages_db_path}") return [] try: conn = sqlite3.connect(f"file:{self.messages_db_path}?mode=ro", uri=True) cursor = conn.cursor() # Query recent messages across all conversations query = """ SELECT message.text, message.attributedBody, message.date, message.is_from_me, handle.id FROM message LEFT JOIN handle ON message.handle_id = handle.ROWID ORDER BY message.date DESC LIMIT ? """ cursor.execute(query, (limit,)) rows = cursor.fetchall() messages = [] for row in rows: text, attributed_body, date_cocoa, is_from_me, handle_id = row # Extract message text message_text = text if not message_text and attributed_body: message_text = extract_text_from_blob(attributed_body) # Convert timestamp if date_cocoa: cocoa_epoch = datetime(2001, 1, 1) date = cocoa_epoch + timedelta(seconds=date_cocoa / 1_000_000_000) else: date = None messages.append({ "text": message_text or "[message content not available]", "date": date.isoformat() if date else None, "is_from_me": bool(is_from_me), "phone": handle_id or "unknown", "contact_name": None # Will be populated by MCP tool if contact exists }) conn.close() logger.info(f"Retrieved {len(messages)} messages from all conversations") return messages except sqlite3.Error as e: logger.error(f"Database error: {e}") return [] except Exception as e: logger.error(f"Error retrieving messages: {e}") return [] def search_messages( self, query: str, phone: Optional[str] = None, limit: int = 50 ) -> List[Dict]: """ Search messages by content/keyword. Sprint 2.5: Full-text search across all messages or filtered by contact. Args: query: Search query (keyword or phrase) phone: Optional phone number to filter by specific contact limit: Maximum number of results Returns: List[Dict]: List of matching message dicts with keys: - text: Message content - date: Timestamp - is_from_me: Boolean - phone: Phone number or handle - match_snippet: Text snippet showing the match Example: # Search all messages results = interface.search_messages("dinner plans") # Search messages with specific contact results = interface.search_messages("dinner", phone="+14155551234") """ logger.info(f"Searching messages for query: '{query}'" + (f" (contact: {phone})" if phone else " (all contacts)")) if not self.messages_db_path.exists(): logger.error(f"Messages database not found: {self.messages_db_path}") return [] try: conn = sqlite3.connect(f"file:{self.messages_db_path}?mode=ro", uri=True) cursor = conn.cursor() # Build query based on whether we're filtering by phone if phone: sql_query = """ SELECT message.text, message.attributedBody, message.date, message.is_from_me, handle.id FROM message JOIN handle ON message.handle_id = handle.ROWID WHERE (message.text LIKE ? OR message.attributedBody IS NOT NULL) AND handle.id LIKE ? ORDER BY message.date DESC LIMIT ? """ cursor.execute(sql_query, (f"%{query}%", f"%{phone}%", limit)) else: sql_query = """ SELECT message.text, message.attributedBody, message.date, message.is_from_me, handle.id FROM message LEFT JOIN handle ON message.handle_id = handle.ROWID WHERE message.text LIKE ? OR message.attributedBody IS NOT NULL ORDER BY message.date DESC LIMIT ? """ cursor.execute(sql_query, (f"%{query}%", limit)) rows = cursor.fetchall() messages = [] for row in rows: text, attributed_body, date_cocoa, is_from_me, handle_id = row # Extract message text message_text = text if not message_text and attributed_body: message_text = extract_text_from_blob(attributed_body) # Skip if no text content (attachments only, etc.) if not message_text: continue # Check if query matches (for attributedBody messages) if query.lower() not in message_text.lower(): continue # Convert timestamp if date_cocoa: cocoa_epoch = datetime(2001, 1, 1) date = cocoa_epoch + timedelta(seconds=date_cocoa / 1_000_000_000) else: date = None # Create match snippet (show context around match) match_snippet = self._create_snippet(message_text, query) messages.append({ "text": message_text, "date": date.isoformat() if date else None, "is_from_me": bool(is_from_me), "phone": handle_id or "unknown", "match_snippet": match_snippet }) conn.close() logger.info(f"Found {len(messages)} messages matching '{query}'") return messages except sqlite3.Error as e: logger.error(f"Database error: {e}") return [] except Exception as e: logger.error(f"Error searching messages: {e}") return [] def _create_snippet(self, text: str, query: str, context_chars: int = 50) -> str: """ Create a text snippet showing the search query in context. Args: text: Full message text query: Search query context_chars: Characters to show before/after match Returns: Snippet with query highlighted """ try: # Find query in text (case-insensitive) text_lower = text.lower() query_lower = query.lower() match_pos = text_lower.find(query_lower) if match_pos == -1: # No match (shouldn't happen), return start of text return text[:100] + "..." if len(text) > 100 else text # Calculate snippet bounds start = max(0, match_pos - context_chars) end = min(len(text), match_pos + len(query) + context_chars) # Extract snippet snippet = text[start:end] # Add ellipsis if truncated if start > 0: snippet = "..." + snippet if end < len(text): snippet = snippet + "..." return snippet except Exception as e: logger.debug(f"Error creating snippet: {e}") return text[:100] # Add missing import for Sprint 1 basic implementation from datetime import timedelta

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wolfiesch/imessage-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server