"""Direct disk reading of Apple Mail .emlx files.
This module reads emails directly from ~/Library/Mail/V10/ for fast indexing.
Requires Full Disk Access permission for the terminal.
Mail.app storage structure:
~/Library/Mail/V10/
├── [Account-UUID]/
│ └── [Mailbox].mbox/
│ └── Data/x/y/Messages/
│ ├── 12345.emlx
│ └── 12346.emlx
└── MailData/
└── Envelope Index # SQLite with metadata
.emlx file format:
1255 ← Byte count of MIME content
From: sender@example.com ← RFC 5322 headers + body
Subject: Hello
...
<?xml version="1.0"?> ← Plist metadata footer
<plist>...</plist>
"""
from __future__ import annotations
import email
import logging
import mimetypes
import re
import sqlite3
import warnings
from dataclasses import dataclass
from email.header import decode_header, make_header
from pathlib import Path
from typing import TYPE_CHECKING
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from collections.abc import Iterator
# Mail.app version folder (V10 for macOS Catalina+)
MAIL_VERSION = "V10"
def extract_message_id(path: Path) -> int:
"""Extract the numeric message ID from an .emlx filename.
Handles both regular (``12345.emlx``) and partial
(``12345.partial.emlx``) filenames by splitting on the first dot.
Args:
path: Path to an .emlx file
Returns:
Integer message ID
Raises:
ValueError: If the filename does not start with a number
"""
return int(path.name.split(".")[0])
# Maximum email file size to prevent OOM from malformed/huge files (25 MB)
MAX_EMLX_SIZE = 25 * 1024 * 1024
@dataclass
class AttachmentInfo:
"""Metadata for a single email attachment."""
filename: str
mime_type: str
file_size: int
content_id: str | None
@dataclass
class EmlxEmail:
"""Parsed email from .emlx file."""
id: int
subject: str
sender: str
content: str
date_received: str
emlx_path: Path
attachments: list[AttachmentInfo] | None = None
def find_mail_directory() -> Path:
"""
Find the Apple Mail data directory.
Returns:
Path to ~/Library/Mail/V10/
Raises:
FileNotFoundError: If directory doesn't exist
PermissionError: If Full Disk Access is not granted
"""
mail_dir = Path.home() / "Library" / "Mail" / MAIL_VERSION
if not mail_dir.exists():
raise FileNotFoundError(
f"Mail directory not found: {mail_dir}\n"
"Ensure Apple Mail has been used on this Mac."
)
# Test access by trying to list contents
try:
next(mail_dir.iterdir(), None)
except PermissionError as e:
raise PermissionError(
f"Cannot access {mail_dir}\n"
"Grant Full Disk Access to Terminal:\n"
" System Settings → Privacy & Security → Full Disk Access"
) from e
return mail_dir
def find_envelope_index(mail_dir: Path) -> Path:
"""
Find the Envelope Index SQLite database.
Args:
mail_dir: Path to ~/Library/Mail/V10/
Returns:
Path to the Envelope Index database
Raises:
FileNotFoundError: If database not found
"""
# The Envelope Index is in MailData directory
envelope_path = mail_dir.parent / "MailData" / "Envelope Index"
if not envelope_path.exists():
raise FileNotFoundError(
f"Envelope Index not found: {envelope_path}\n"
"Ensure Apple Mail has synced email."
)
return envelope_path
def read_envelope_index(mail_dir: Path) -> dict[int, dict]:
"""
Read the Envelope Index database to get message metadata.
The Envelope Index contains:
- Message IDs and their file paths
- Account and mailbox information
- Basic metadata (subject, sender, dates)
Args:
mail_dir: Path to ~/Library/Mail/V10/
Returns:
Dict mapping message ID to metadata dict with:
- account: Account name
- mailbox: Mailbox name
- emlx_path: Path to .emlx file (relative)
- subject: Email subject
- sender: Sender address
- date_received: ISO date string
"""
envelope_path = find_envelope_index(mail_dir)
# Connect in read-only mode to avoid locking issues
conn = sqlite3.connect(f"file:{envelope_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
result: dict[int, dict] = {}
try:
# Query the messages table joined with mailboxes
# Schema varies by macOS version, so we use a flexible approach
cursor = conn.execute("""
SELECT
m.ROWID as id,
m.subject,
m.sender,
m.date_received,
m.mailbox as mailbox_id,
mb.url as mailbox_url
FROM messages m
LEFT JOIN mailboxes mb ON m.mailbox = mb.ROWID
ORDER BY m.date_received DESC
""")
for row in cursor:
msg_id = row["id"]
# Parse mailbox URL to get account and mailbox name
# Format: mailbox://[account-uuid]/[mailbox-name]
mailbox_url = row["mailbox_url"] or ""
account, mailbox = _parse_mailbox_url(mailbox_url)
result[msg_id] = {
"account": account,
"mailbox": mailbox,
"subject": row["subject"] or "",
"sender": row["sender"] or "",
"date_received": _format_timestamp(row["date_received"]),
}
except sqlite3.OperationalError as e:
# Schema might be different, try alternative approach
if "no such table" in str(e).lower():
# Fallback to scanning .emlx files directly
pass
else:
raise
finally:
conn.close()
return result
def _parse_mailbox_url(url: str) -> tuple[str, str]:
"""
Parse a mailbox URL to extract account and mailbox names.
Args:
url: mailbox://account-uuid/mailbox-name
Returns:
(account_name, mailbox_name) tuple
"""
if not url:
return ("Unknown", "Unknown")
# Remove mailbox:// prefix
path = url.replace("mailbox://", "")
# Split by /
parts = path.split("/", 1)
if len(parts) >= 2:
account = parts[0] or "Unknown"
mailbox = parts[1] or "Unknown"
return (account, mailbox)
return (parts[0] if parts else "Unknown", "Unknown")
def _format_timestamp(timestamp: float | int | None) -> str:
"""Convert Core Data timestamp to ISO string."""
if timestamp is None:
return ""
# Core Data timestamps are seconds since Jan 1, 2001
# Convert to Unix timestamp (seconds since Jan 1, 1970)
import datetime
CORE_DATA_EPOCH = 978307200 # Jan 1, 2001 in Unix time
try:
unix_ts = timestamp + CORE_DATA_EPOCH
dt = datetime.datetime.fromtimestamp(unix_ts, tz=datetime.UTC)
return dt.isoformat()
except (OSError, ValueError, OverflowError):
return ""
def parse_emlx(path: Path) -> EmlxEmail | None:
"""
Parse a single .emlx file.
.emlx format:
1. First line: byte count of MIME content
2. MIME message (RFC 5322)
3. XML plist footer with Apple metadata
Args:
path: Path to .emlx file
Returns:
EmlxEmail with parsed content, or None if parsing fails
"""
try:
# Check file size to prevent OOM from huge/malformed files
if path.stat().st_size > MAX_EMLX_SIZE:
return None
content = path.read_bytes()
# Find the byte count on first line
newline_idx = content.find(b"\n")
if newline_idx == -1:
return None
try:
byte_count = int(content[:newline_idx].strip())
except ValueError:
return None
# Extract MIME content
mime_start = newline_idx + 1
mime_end = mime_start + byte_count
mime_content = content[mime_start:mime_end]
# Parse MIME message
msg = email.message_from_bytes(mime_content)
# Extract subject with proper decoding
subject = ""
if msg["Subject"]:
try:
subject = str(make_header(decode_header(msg["Subject"])))
except (UnicodeDecodeError, LookupError):
subject = msg["Subject"] or ""
# Extract sender
sender = msg["From"] or ""
if sender:
try:
sender = str(make_header(decode_header(sender)))
except (UnicodeDecodeError, LookupError):
pass
# Extract date and convert from RFC 2822 to ISO 8601
date_received = ""
if msg["Date"]:
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(msg["Date"])
date_received = dt.isoformat()
except (ValueError, TypeError):
date_received = msg["Date"]
# Extract body text
body = _extract_body_text(msg)
# Extract attachment metadata
attachments = _extract_attachments(msg, emlx_path=path)
# Extract message ID from filename (handles .partial.emlx)
msg_id = extract_message_id(path)
return EmlxEmail(
id=msg_id,
subject=subject,
sender=sender,
content=body,
date_received=date_received,
emlx_path=path,
attachments=attachments or None,
)
except (OSError, ValueError, UnicodeDecodeError, LookupError):
# Skip malformed files
return None
def _extract_body_text(msg: email.message.Message) -> str:
"""
Extract plain text body from email message.
Handles multipart messages, preferring text/plain over text/html.
"""
if msg.is_multipart():
text_parts = []
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
try:
decoded = payload.decode(charset, errors="replace")
text_parts.append(decoded)
except (UnicodeDecodeError, LookupError):
decoded = payload.decode("utf-8", errors="replace")
text_parts.append(decoded)
if text_parts:
return "\n".join(text_parts)
# Fallback to HTML if no plain text
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
try:
html = payload.decode(charset, errors="replace")
return _strip_html(html)
except (UnicodeDecodeError, LookupError):
pass
return ""
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
try:
text = payload.decode(charset, errors="replace")
if msg.get_content_type() == "text/html":
return _strip_html(text)
return text
except (UnicodeDecodeError, LookupError):
return payload.decode("utf-8", errors="replace")
return ""
def _strip_html(html: str) -> str:
"""
Robust HTML to text conversion using BeautifulSoup.
Uses a proper HTML parser instead of regex to prevent XSS bypass
attacks from malformed HTML like <<script> or nested tags.
"""
try:
from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements completely
for element in soup(["script", "style"]):
element.decompose()
# Get text with newlines as separators
text = soup.get_text(separator="\n", strip=True)
# Collapse multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
text = re.sub(r" +", " ", text)
return text.strip()
except Exception:
# Fallback: return empty string if parsing fails entirely.
# Covers ParserRejectedMarkup from malformed HTML and
# any other parser errors that shouldn't crash the scan.
return ""
def _estimate_attachment_size(part: email.message.Message) -> int:
"""Estimate decoded attachment size without full MIME decode.
Avoids allocating the full decoded binary during indexing by
computing the size from the encoded payload and transfer encoding.
Strategy:
1. Use ``Content-Length`` header if present (rare but exact).
2. Compute from encoded payload length and encoding type:
- base64 → ``(clean_len * 3) // 4``
- quoted-printable / 7bit / 8bit → encoded length (≈ decoded)
3. Fallback to 0.
"""
# 1. Explicit Content-Length header (rare but exact)
cl = part.get("Content-Length")
if cl:
try:
return int(cl)
except (ValueError, TypeError):
pass
# 2. Compute from encoded payload
raw = part.get_payload(decode=False)
if not raw or not isinstance(raw, str):
return 0
encoding = (part.get("Content-Transfer-Encoding") or "").lower().strip()
if encoding == "base64":
# Strip whitespace to get the clean base64 length
stripped = raw.replace("\n", "").replace("\r", "")
clean_len = len(stripped.replace(" ", ""))
if clean_len == 0:
return 0
# Standard base64 ratio: 3 decoded bytes per 4 encoded chars
# Account for padding
padding = raw.rstrip().count("=") if raw.rstrip().endswith("=") else 0
return (clean_len * 3) // 4 - padding
else:
# QP, 7bit, 8bit — encoded length ≈ decoded length
return len(raw)
def _find_external_attachment(
emlx_path: Path,
msg_id: int,
part_idx: int,
filename: str,
) -> Path | None:
"""Find an externally-stored attachment on disk.
Apple Mail stores external attachments for ``.partial.emlx``
files in a sibling ``Attachments`` directory::
.../Data/9/4/Messages/49461.partial.emlx
.../Data/9/4/Attachments/49461/2/file.jpeg
Args:
emlx_path: Path to the ``.emlx`` file.
msg_id: Numeric message ID extracted from *emlx_path*.
part_idx: 1-based attachment part index (matches the
subdirectory name under ``Attachments/<msg_id>/``).
filename: Target filename to look for.
Returns:
Path to the external file, or ``None`` if not found.
"""
# Navigate: Messages/ -> parent -> Attachments/<msg_id>/
attachments_dir = emlx_path.parent.parent / "Attachments" / str(msg_id)
if not attachments_dir.is_dir():
return None
# Part sub-directories are 1-based: 2/, 3/, 4/, …
# The part_idx we receive is already 1-based.
part_dir = attachments_dir / str(part_idx)
if not part_dir.is_dir():
return None
# Strategy 1: exact filename match
# Guard against path traversal from untrusted MIME filenames
# (e.g. filename="../../etc/passwd")
candidate = part_dir / filename
try:
if not candidate.resolve().is_relative_to(part_dir.resolve()):
return None
except (ValueError, OSError):
return None
if candidate.is_file():
return candidate
# Strategy 2: take the single file in the subdirectory
# (each part subdir has exactly one file, sometimes with
# a generic name like "Mail Attachment.jpeg").
try:
files = [f for f in part_dir.iterdir() if f.is_file()]
except OSError:
return None
if len(files) == 1:
return files[0]
return None
def _extract_attachments(
msg: email.message.Message,
*,
emlx_path: Path | None = None,
) -> list[AttachmentInfo]:
"""Extract attachment metadata from an email message.
Walks MIME parts and collects non-inline, non-text parts
(or inline parts with Content-ID, i.e. embedded images).
When *emlx_path* is provided and the estimated size is 0
(common for ``.partial.emlx`` with external attachments),
the function tries to stat the external file to get an
accurate size.
Args:
msg: Parsed email message
emlx_path: Optional path to the ``.emlx`` file on
disk, used to locate external attachments.
Returns:
List of AttachmentInfo with filename, mime_type,
size, content_id
"""
attachments: list[AttachmentInfo] = []
if not msg.is_multipart():
return attachments
# Resolve msg_id once if we might need external lookup
msg_id: int | None = None
if emlx_path is not None:
try:
msg_id = extract_message_id(emlx_path)
except ValueError:
pass
attachment_part_idx = 0
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get("Content-Disposition") or "")
# Skip multipart containers and plain text/html body
if part.get_content_maintype() == "multipart":
continue
if content_type in ("text/plain", "text/html") and (
"attachment" not in disposition.lower()
):
continue
filename = part.get_filename() or ""
if not filename and "attachment" not in disposition.lower():
continue
# 1-based index matching Attachments/ subdirs
attachment_part_idx += 1
file_size = _estimate_attachment_size(part)
# Fallback: stat external file for .partial.emlx
if file_size == 0 and emlx_path is not None and msg_id is not None:
# Part subdirs start at 2 for the first
# attachment (1 is typically the body part)
ext = _find_external_attachment(
emlx_path,
msg_id,
attachment_part_idx + 1,
filename,
)
if ext is not None:
try:
file_size = ext.stat().st_size
except OSError:
pass
content_id = part.get("Content-ID")
if content_id:
# Strip angle brackets: <cid123> → cid123
content_id = content_id.strip("<>")
attachments.append(
AttachmentInfo(
filename=filename,
mime_type=content_type,
file_size=file_size,
content_id=content_id,
)
)
return attachments
def get_attachment_content(
emlx_path: Path, target_filename: str
) -> tuple[bytes, str] | None:
"""
Extract a specific attachment's content from an .emlx file.
Args:
emlx_path: Path to the .emlx file
target_filename: Filename of the attachment to extract
Returns:
(raw_bytes, mime_type) tuple, or None if not found
"""
try:
if not emlx_path.exists():
return None
if emlx_path.stat().st_size > MAX_EMLX_SIZE:
return None
content = emlx_path.read_bytes()
newline_idx = content.find(b"\n")
if newline_idx == -1:
return None
byte_count = int(content[:newline_idx].strip())
mime_start = newline_idx + 1
mime_end = mime_start + byte_count
msg = email.message_from_bytes(content[mime_start:mime_end])
# Walk MIME parts, tracking attachment index for
# external-file fallback.
attachment_part_idx = 0
for part in msg.walk():
ct = part.get_content_type()
disp = str(part.get("Content-Disposition") or "")
if part.get_content_maintype() == "multipart":
continue
if ct in ("text/plain", "text/html") and (
"attachment" not in disp.lower()
):
continue
fname = part.get_filename() or ""
if not fname and "attachment" not in disp.lower():
continue
attachment_part_idx += 1
if fname != target_filename:
continue
# Primary path: embedded MIME payload
payload = part.get_payload(decode=True)
if payload:
return (payload, ct)
# Fallback: external file on disk
result = _read_external_attachment(
emlx_path,
attachment_part_idx,
target_filename,
)
if result is not None:
return result
return None
except (OSError, ValueError, UnicodeDecodeError):
return None
def _read_external_attachment(
emlx_path: Path,
attachment_part_idx: int,
target_filename: str,
) -> tuple[bytes, str] | None:
"""Read an external attachment file from disk.
Helper for :func:`get_attachment_content` that locates
and reads the external file stored alongside a
``.partial.emlx``.
Args:
emlx_path: Path to the ``.emlx`` file.
attachment_part_idx: 1-based attachment index.
target_filename: Filename to find.
Returns:
``(bytes, mime_type)`` or ``None``.
"""
try:
msg_id = extract_message_id(emlx_path)
except ValueError:
return None
ext_path = _find_external_attachment(
emlx_path,
msg_id,
attachment_part_idx + 1,
target_filename,
)
if ext_path is None:
return None
try:
if ext_path.stat().st_size > MAX_EMLX_SIZE:
return None
data = ext_path.read_bytes()
except (OSError, PermissionError):
return None
mime_type, _ = mimetypes.guess_type(ext_path.name)
if mime_type is None:
mime_type = "application/octet-stream"
return (data, mime_type)
def scan_emlx_files(
mail_dir: Path,
exclude_mailboxes: set[str] | None = None,
) -> Iterator[Path]:
"""
Find all .emlx files in the Mail directory.
Args:
mail_dir: Path to ~/Library/Mail/V10/
exclude_mailboxes: Mailbox names to skip (e.g. {"Drafts"}).
Uses APPLE_MAIL_INDEX_EXCLUDE_MAILBOXES config if None.
Yields:
Paths to .emlx files
"""
if exclude_mailboxes is None:
from ..config import get_index_exclude_mailboxes
exclude_mailboxes = get_index_exclude_mailboxes()
# .emlx files are in: account-uuid/mailbox.mbox/Data/x/y/Messages/
for emlx_path in mail_dir.rglob("*.emlx"):
# Skip excluded mailboxes by checking .mbox dir name
if exclude_mailboxes:
parts = emlx_path.relative_to(mail_dir).parts
if len(parts) > 1:
mbox_dir = parts[1]
mbox_name = (
mbox_dir[:-5] if mbox_dir.endswith(".mbox") else mbox_dir
)
if mbox_name in exclude_mailboxes:
continue
yield emlx_path
def scan_all_emails(mail_dir: Path) -> Iterator[dict]:
"""
Scan all emails from the Mail directory.
This combines the Envelope Index metadata with .emlx file content
for comprehensive email data.
Args:
mail_dir: Path to ~/Library/Mail/V10/
Yields:
Email dicts with: id, account, mailbox, subject, sender,
content, date_received, emlx_path
"""
# First, try to read metadata from Envelope Index
try:
metadata = read_envelope_index(mail_dir)
except (FileNotFoundError, sqlite3.Error):
metadata = {}
# Scan .emlx files and combine with metadata
for emlx_path in scan_emlx_files(mail_dir):
try:
parsed = parse_emlx(emlx_path)
except Exception as e:
logger.warning("Skipping corrupt file %s: %s", emlx_path, e)
continue
if not parsed:
continue
msg_id = parsed.id
# Get metadata from Envelope Index if available
meta = metadata.get(msg_id, {})
# Infer account/mailbox from path if not in metadata
if not meta:
account, mailbox = _infer_account_mailbox(emlx_path, mail_dir)
meta = {"account": account, "mailbox": mailbox}
yield {
"id": msg_id,
"account": meta.get("account", "Unknown"),
"mailbox": meta.get("mailbox", "Unknown"),
"subject": parsed.subject or meta.get("subject", ""),
"sender": parsed.sender or meta.get("sender", ""),
"content": parsed.content,
"date_received": meta.get("date_received") or parsed.date_received,
"emlx_path": str(emlx_path),
"attachments": parsed.attachments or [],
}
def get_disk_inventory(mail_dir: Path) -> dict[tuple[str, str, int], str]:
"""
Fast inventory of all emails on disk WITHOUT parsing content.
This walks the filesystem and extracts (account, mailbox, message_id)
from file paths. Much faster than scan_all_emails() since it doesn't
read file content.
Path structure:
V10/[account-uuid]/[mailbox].mbox/Data/.../Messages/[id].emlx
Args:
mail_dir: Path to ~/Library/Mail/V10/
Returns:
Dict mapping (account, mailbox, msg_id) -> emlx_path string
"""
inventory: dict[tuple[str, str, int], str] = {}
for emlx_path in scan_emlx_files(mail_dir):
try:
# Extract message ID from filename (handles .partial.emlx)
msg_id = extract_message_id(emlx_path)
# Infer account/mailbox from path
account, mailbox = _infer_account_mailbox(emlx_path, mail_dir)
inventory[(account, mailbox, msg_id)] = str(emlx_path)
except (ValueError, AttributeError):
# Skip files with non-numeric names
continue
return inventory
def _infer_account_mailbox(emlx_path: Path, mail_dir: Path) -> tuple[str, str]:
"""
Infer account and mailbox from .emlx file path.
Path structure: V10/account-uuid/mailbox.mbox/Data/.../Messages/id.emlx
"""
try:
relative = emlx_path.relative_to(mail_dir)
parts = relative.parts
# First part is account UUID
account = parts[0] if parts else "Unknown"
# Second part is mailbox.mbox
mailbox = "Unknown"
if len(parts) > 1:
mbox_part = parts[1]
if mbox_part.endswith(".mbox"):
mailbox = mbox_part[:-5] # Remove .mbox suffix
else:
mailbox = mbox_part
return (account, mailbox)
except ValueError:
return ("Unknown", "Unknown")