"""Database connection management for macOS Contacts (AddressBook).
The macOS Contacts app stores contact information in SQLite databases using Core Data.
Multiple databases may exist:
- Main database: ~/Library/Application Support/AddressBook/AddressBook-v22.abcddb
- Per-source databases: ~/Library/Application Support/AddressBook/Sources/*/AddressBook-v22.abcddb
This module provides:
- Discovery of all contacts databases (main + per-source)
- Read-only connection management with permission handling
- Access permission checking
- Contact resolution (phone/email -> display name)
- Thread-safe caching for contact lookups
"""
import asyncio
import logging
import sqlite3
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from ..constants import CONTACTS_DB_PATH, CONTACTS_SOURCES_PATH
from ..exceptions import ContactsPermissionError, DatabaseLockedError
from .queries import normalize_for_contact_matching
logger = logging.getLogger(__name__)
# Global flag indicating if Contacts access is available
# Set to False if we detect permission issues on first access attempt
CONTACTS_ACCESS_AVAILABLE: bool = True
def discover_contacts_databases() -> list[Path]:
"""Discover all Contacts database files (main + per-source).
Searches for:
1. Main database: ~/Library/Application Support/AddressBook/AddressBook-v22.abcddb
2. Source databases: ~/Library/Application Support/AddressBook/Sources/*/AddressBook-v22.abcddb
Returns:
List of Path objects for all discovered .abcddb files.
Returns empty list if base directories don't exist.
"""
databases: list[Path] = []
# Check main database
if CONTACTS_DB_PATH.exists():
databases.append(CONTACTS_DB_PATH)
logger.debug(f"Found main contacts database: {CONTACTS_DB_PATH}")
# Check per-source databases
if CONTACTS_SOURCES_PATH.exists() and CONTACTS_SOURCES_PATH.is_dir():
# Find all AddressBook-v22.abcddb files in Sources subdirectories
source_dbs = list(CONTACTS_SOURCES_PATH.glob("*/AddressBook-v22.abcddb"))
databases.extend(source_dbs)
logger.debug(f"Found {len(source_dbs)} source database(s): {source_dbs}")
logger.info(f"Discovered {len(databases)} total contacts database(s)")
return databases
def check_contacts_access() -> dict[str, bool | str | list[str]]:
"""Check if we have access to the Contacts databases.
Returns:
Dictionary with access status and any error messages:
- main_db_exists: bool - Main database file exists
- main_db_readable: bool - Main database is readable
- sources_path_exists: bool - Sources directory exists
- discovered_databases: list[str] - Paths to all discovered databases
- accessible_databases: list[str] - Paths to accessible databases
- error: str (optional) - Error message if access is denied
"""
result: dict[str, bool | str | list[str]] = {
"main_db_exists": CONTACTS_DB_PATH.exists(),
"main_db_readable": (
CONTACTS_DB_PATH.exists()
and CONTACTS_DB_PATH.is_file()
and bool(
CONTACTS_DB_PATH.stat().st_mode & 0o400
) # Check read permission bit
),
"sources_path_exists": CONTACTS_SOURCES_PATH.exists(),
}
# Discover all databases
discovered = discover_contacts_databases()
result["discovered_databases"] = [str(db) for db in discovered]
# Test access to each database
accessible = []
for db_path in discovered:
try:
# Try to open a connection to test access
with get_contacts_connection(db_path, timeout=1.0):
accessible.append(str(db_path))
except (ContactsPermissionError, DatabaseLockedError) as e:
logger.warning(f"Cannot access {db_path}: {e}")
result["accessible_databases"] = accessible
# Set error message if no databases are accessible
if discovered and not accessible:
result["error"] = (
"Cannot access Contacts databases. Please grant Contacts access to "
"Terminal.app or Claude Desktop.app in System Settings > "
"Privacy & Security > Contacts."
)
return result
@contextmanager
def get_contacts_connection(
db_path: Path,
timeout: float = 5.0,
) -> Generator[sqlite3.Connection, None, None]:
"""Get a read-only connection to a Contacts database.
Uses read-only connection parameters to safely access the database
while the Contacts app may be running.
Args:
db_path: Path to the .abcddb database file.
timeout: Busy timeout in seconds (how long to wait if DB is locked).
Yields:
SQLite connection configured for read-only access.
Raises:
ContactsPermissionError: If Contacts access is not granted.
DatabaseLockedError: If the database remains locked after timeout.
Example:
>>> with get_contacts_connection(CONTACTS_DB_PATH) as conn:
... cursor = conn.execute("SELECT * FROM ZABCDRECORD LIMIT 1")
... row = cursor.fetchone()
"""
global CONTACTS_ACCESS_AVAILABLE
if not db_path.exists():
raise ContactsPermissionError(
f"Contacts database not found: {db_path}. "
"Ensure Contacts app is set up on this system."
)
if not db_path.is_file():
raise ContactsPermissionError(
f"Contacts database path is not a file: {db_path}"
)
# Use URI mode for read-only access
# mode=ro ensures we don't accidentally modify the database
uri = f"file:{db_path}?mode=ro"
conn = None
try:
conn = sqlite3.connect(
uri,
uri=True,
timeout=timeout,
check_same_thread=False,
)
# Enable row factory for dict-like access
conn.row_factory = sqlite3.Row
# Set busy timeout in milliseconds
conn.execute(f"PRAGMA busy_timeout = {int(timeout * 1000)}")
yield conn
except sqlite3.OperationalError as e:
error_msg = str(e).lower()
if "locked" in error_msg or "busy" in error_msg:
# Database is locked (Contacts app is using it)
raise DatabaseLockedError(
f"Contacts database is locked: {db_path}. "
"This may occur when Contacts.app is running. Please try again."
) from e
elif (
"unable to open" in error_msg
or "permission" in error_msg
or "denied" in error_msg
):
# Permission denied
CONTACTS_ACCESS_AVAILABLE = False
raise ContactsPermissionError(
f"Cannot open Contacts database: {db_path}. "
"Please grant Contacts access to Terminal.app or Claude Desktop.app "
"in System Settings > Privacy & Security > Contacts."
) from e
else:
# Unknown error
logger.error(f"Unexpected error opening Contacts database: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error with Contacts database {db_path}: {e}")
raise
finally:
if conn is not None:
conn.close()
@dataclass
class ContactInfo:
"""Contact information from the Contacts database.
Attributes:
first_name: Contact's first name
last_name: Contact's last name
nickname: Contact's nickname
organization: Contact's organization/company name
"""
first_name: str | None
last_name: str | None
nickname: str | None
organization: str | None
def get_display_name(self) -> str | None:
"""Return display name with fallbacks: nickname -> first+last -> organization.
Returns:
Display name string, or None if no name information is available.
"""
# Priority 1: Nickname
if self.nickname:
return self.nickname
# Priority 2: First + Last name
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
elif self.first_name:
return self.first_name
elif self.last_name:
return self.last_name
# Priority 3: Organization
if self.organization:
return self.organization
return None
class ContactResolver:
"""Resolve phone numbers and emails to contact display names.
Loads contact information from all discovered Contacts databases and
provides lookup functionality to resolve handles (phone/email) to
human-readable display names.
Example:
>>> resolver = ContactResolver()
>>> resolver.load_all_contacts()
>>> display_name = resolver.resolve("+15551234567")
>>> print(display_name) # "John Smith" or None if not found
"""
def __init__(self) -> None:
"""Initialize empty contact mappings."""
self.phone_to_contact: dict[str, ContactInfo] = {}
self.email_to_contact: dict[str, ContactInfo] = {}
def load_contacts_from_db(self, db_path: Path) -> int:
"""Load contacts from a single database.
Args:
db_path: Path to the .abcddb database file
Returns:
Number of contacts loaded (unique contact records)
Raises:
ContactsPermissionError: If Contacts access is not granted
DatabaseLockedError: If the database is locked
"""
contacts_loaded = 0
with get_contacts_connection(db_path) as conn:
# Query to load all contacts with their phone numbers and emails
# Z_ENT = 22 is the ABCDContact entity type
query = """
SELECT
r.Z_PK,
r.ZFIRSTNAME,
r.ZLASTNAME,
r.ZNICKNAME,
r.ZORGANIZATION,
p.ZFULLNUMBER,
e.ZADDRESS
FROM ZABCDRECORD r
LEFT JOIN ZABCDPHONENUMBER p ON r.Z_PK = p.ZOWNER
LEFT JOIN ZABCDEMAILADDRESS e ON r.Z_PK = e.ZOWNER
WHERE r.Z_ENT = 22
"""
cursor = conn.execute(query)
# Track which contact records we've seen
seen_contacts: set[int] = set()
for row in cursor:
contact_id = row["Z_PK"]
# Create ContactInfo for this record
contact_info = ContactInfo(
first_name=row["ZFIRSTNAME"],
last_name=row["ZLASTNAME"],
nickname=row["ZNICKNAME"],
organization=row["ZORGANIZATION"],
)
# Track unique contacts
if contact_id not in seen_contacts:
seen_contacts.add(contact_id)
contacts_loaded += 1
# Map phone number to contact (if present)
phone = row["ZFULLNUMBER"]
if phone:
normalized_phone = normalize_for_contact_matching(phone)
# First match wins - don't overwrite existing mappings
if normalized_phone not in self.phone_to_contact:
self.phone_to_contact[normalized_phone] = contact_info
# Map email to contact (if present)
email = row["ZADDRESS"]
if email:
normalized_email = normalize_for_contact_matching(email)
# First match wins - don't overwrite existing mappings
if normalized_email not in self.email_to_contact:
self.email_to_contact[normalized_email] = contact_info
logger.info(
f"Loaded {contacts_loaded} contacts from {db_path} "
f"({len(self.phone_to_contact)} phone mappings, "
f"{len(self.email_to_contact)} email mappings)"
)
return contacts_loaded
def load_all_contacts(self) -> dict[str, int]:
"""Load contacts from all discovered databases.
Aggregates contacts from all available Contacts databases (main + sources).
For duplicate handles across databases, the first match wins.
Returns:
Dictionary with statistics:
- databases_found: Number of databases discovered
- databases_loaded: Number of databases successfully loaded
- total_contacts: Total unique contact records loaded
- total_phone_mappings: Total phone number mappings
- total_email_mappings: Total email mappings
"""
# Clear existing mappings
self.phone_to_contact.clear()
self.email_to_contact.clear()
databases = discover_contacts_databases()
databases_loaded = 0
total_contacts = 0
for db_path in databases:
try:
contacts_loaded = self.load_contacts_from_db(db_path)
total_contacts += contacts_loaded
databases_loaded += 1
except (ContactsPermissionError, DatabaseLockedError) as e:
logger.warning(f"Failed to load contacts from {db_path}: {e}")
continue
logger.info(
f"Loaded contacts from {databases_loaded}/{len(databases)} databases: "
f"{total_contacts} contacts, {len(self.phone_to_contact)} phone mappings, "
f"{len(self.email_to_contact)} email mappings"
)
return {
"databases_found": len(databases),
"databases_loaded": databases_loaded,
"total_contacts": total_contacts,
"total_phone_mappings": len(self.phone_to_contact),
"total_email_mappings": len(self.email_to_contact),
}
def resolve(self, handle: str) -> str | None:
"""Resolve a phone/email handle to a display name.
Args:
handle: Phone number or email address to resolve
Returns:
Display name if found, or None if:
- Handle is "me" (sender is current user)
- No matching contact is found
- Contact has no displayable name
"""
# Special case: "me" is the current user, don't resolve
if handle == "me":
return None
# Normalize the handle for lookup
normalized = normalize_for_contact_matching(handle)
# Try phone lookup first
contact_info = self.phone_to_contact.get(normalized)
# If not found, try email lookup
if contact_info is None:
contact_info = self.email_to_contact.get(normalized)
# Return display name if contact was found
if contact_info is not None:
return contact_info.get_display_name()
return None
class ContactCache:
"""Thread-safe cache for contact name lookups.
Uses a Load-All strategy: loads all contacts from all databases on first access.
Provides thread-safe access via asyncio.Lock and efficient bulk lookups.
Example:
>>> cache = get_contact_cache()
>>> name = await cache.resolve_name("+15551234567")
>>> print(name) # "John Smith" or None if not found
>>>
>>> # Bulk lookup
>>> handles = ["+15551234567", "john@example.com", "+15559876543"]
>>> names = await cache.resolve_names_batch(handles)
>>> print(names) # {"+15551234567": "John Smith", ...}
"""
def __init__(self) -> None:
"""Initialize the cache with empty state."""
self._resolver: ContactResolver | None = None
self._lock = asyncio.Lock()
self._loaded = False
@property
def is_available(self) -> bool:
"""Check if contacts access is available.
Returns:
True if contacts can be accessed, False if permission denied.
"""
return CONTACTS_ACCESS_AVAILABLE
async def _ensure_loaded(self) -> None:
"""Load contacts on first access (under lock).
Thread-safe lazy loading: only the first caller loads contacts,
subsequent callers wait for the load to complete.
"""
async with self._lock:
if not self._loaded:
self._resolver = ContactResolver()
self._resolver.load_all_contacts()
self._loaded = True
async def resolve_name(self, handle: str) -> str | None:
"""Resolve a single handle to a contact name.
Args:
handle: Phone number or email address to resolve
Returns:
Display name if found, None if not found or contacts unavailable.
"""
if not self.is_available:
return None
await self._ensure_loaded()
return self._resolver.resolve(handle) if self._resolver else None
async def resolve_names_batch(self, handles: list[str]) -> dict[str, str | None]:
"""Resolve multiple handles efficiently.
Args:
handles: List of phone numbers or email addresses to resolve
Returns:
Dictionary mapping each handle to its resolved name (or None).
"""
if not self.is_available:
return {h: None for h in handles}
await self._ensure_loaded()
return {
h: self._resolver.resolve(h) if self._resolver else None for h in handles
}
async def refresh(self) -> None:
"""Force reload of contacts from database.
Clears the current cache and reloads all contacts from all databases.
Thread-safe: uses lock to prevent concurrent access during reload.
"""
async with self._lock:
self._resolver = ContactResolver()
self._resolver.load_all_contacts()
self._loaded = True
# Module-level singleton instance
_contact_cache: ContactCache | None = None
def get_contact_cache() -> ContactCache:
"""Get or create the singleton contact cache.
Returns:
The global ContactCache instance.
Example:
>>> cache = get_contact_cache()
>>> name = await cache.resolve_name("+15551234567")
"""
global _contact_cache
if _contact_cache is None:
_contact_cache = ContactCache()
return _contact_cache