"""Parser for iMessage attributedBody binary format.
On recent macOS versions, message text is stored in the `attributedBody`
column as a binary typedstream format, rather than plain text in the `text`
column. This module provides parsing for that format.
The attributedBody format is an Apple proprietary binary serialization of
NSAttributedString. The text content is embedded after specific markers.
Format structure (reverse-engineered):
streamtyped header
...
NSString marker
type markers (0x01, 0x94, 0x84, etc.)
'+' marker (0x2B)
length byte (or 0x81 + 2-byte length for strings > 127 bytes)
UTF-8 text content
...
"""
import sqlite3
from typing import Optional, Union
def parse_attributed_body(blob: Optional[bytes]) -> Optional[str]:
"""Parse the attributedBody binary format to extract message text.
This handles the binary typedstream format used on recent macOS versions
where message text is stored in attributedBody rather than plain text.
Args:
blob: The raw attributedBody bytes from the database
Returns:
Extracted message text, or None if parsing fails or blob is empty
"""
if blob is None or len(blob) == 0:
return None
try:
# Find NSString marker - the text content follows this
ns_idx = blob.find(b"NSString")
if ns_idx == -1:
return None
# Search for the '+' marker (0x2B) which precedes the length byte
# This marker appears within ~20 bytes after NSString
search_start = ns_idx + len(b"NSString")
search_region = blob[search_start : search_start + 20]
plus_offset = search_region.find(b"+")
if plus_offset == -1:
# Fallback: try scanning for length-prefixed strings
return _fallback_parse(blob, search_start)
abs_plus_offset = search_start + plus_offset
# The byte after '+' is the length (or 0x81 for 2-byte length)
len_offset = abs_plus_offset + 1
if len_offset >= len(blob):
return None
length = blob[len_offset]
# Handle 2-byte length prefix (0x81 means next 2 bytes are length)
if length == 0x81 and len_offset + 2 < len(blob):
# Little-endian 2-byte length
length = blob[len_offset + 1] | (blob[len_offset + 2] << 8)
text_start = len_offset + 3
else:
text_start = len_offset + 1
# Extract the text
if length > 0 and text_start + length <= len(blob):
text_bytes = blob[text_start : text_start + length]
text = text_bytes.decode("utf-8")
# Validate it's actual message content
if _is_message_text(text):
return text
# If primary method fails, try fallback
return _fallback_parse(blob, search_start)
except (UnicodeDecodeError, IndexError, ValueError):
return None
def _fallback_parse(blob: bytes, search_start: int) -> Optional[str]:
"""Fallback parser that scans for length-prefixed UTF-8 strings.
This handles edge cases where the standard format isn't found.
Args:
blob: The raw attributedBody bytes
search_start: Position to start scanning from
Returns:
Extracted text or None
"""
candidates: list[tuple[int, str]] = []
pos = search_start
while pos < min(len(blob) - 2, search_start + 200):
b = blob[pos]
# Skip known type markers
if b in (0x84, 0x94, 0x01, 0x92, 0x85):
pos += 1
continue
# Check for 2-byte length prefix (0x81)
if b == 0x81 and pos + 3 <= len(blob):
length = blob[pos + 1] | (blob[pos + 2] << 8)
if 1 <= length <= 5000 and pos + 3 + length <= len(blob):
candidate = blob[pos + 3 : pos + 3 + length]
try:
text = candidate.decode("utf-8")
if _is_message_text(text):
candidates.append((len(text), text))
except UnicodeDecodeError:
pass
pos += 3
continue
# Check for 1-byte length (strings <= 127 bytes)
if 1 <= b <= 127 and pos + 1 + b <= len(blob):
candidate = blob[pos + 1 : pos + 1 + b]
try:
text = candidate.decode("utf-8")
if _is_message_text(text):
candidates.append((len(text), text))
except UnicodeDecodeError:
pass
pos += 1
# Return the longest valid candidate
if candidates:
candidates.sort(reverse=True)
return candidates[0][1]
return None
def _is_message_text(text: str) -> bool:
"""Check if extracted text looks like actual message content.
Filters out metadata strings that might be extracted by mistake.
Args:
text: Text to validate
Returns:
True if text appears to be message content
"""
if not text:
return False
# Filter out class names and metadata
invalid_patterns = [
"NSString",
"NSDictionary",
"NSArray",
"NSObject",
"NSMutableString",
"NSAttributedString",
"NSNumber",
"NSValue",
"streamtyped",
"bplist",
"__kIM",
]
for pattern in invalid_patterns:
if pattern in text:
return False
# Check that most characters are printable
printable_count = sum(1 for c in text if c.isprintable() or c in "\n\r\t ")
return printable_count >= len(text) * 0.7
def parse_message_text(row: Union[sqlite3.Row, dict]) -> Optional[str]:
"""Extract message text from a database row.
This is the main entry point for message text extraction. It implements
the fallback chain:
1. Try the `text` column (works on older macOS)
2. Try parsing `attributedBody` (required on recent macOS)
3. Return None for messages without text (e.g., media-only)
Args:
row: A database row with 'text' and 'attributedBody' columns
Returns:
Message text, or None if no text content
"""
# Try text column first (older macOS or some message types)
text = row["text"] if "text" in row.keys() else None
if text:
return text
# Try attributedBody (recent macOS)
attr_body = row["attributedBody"] if "attributedBody" in row.keys() else None
if attr_body:
return parse_attributed_body(attr_body)
return None