"""Search index database management for iMessage search and embeddings.
This module manages a separate SQLite database (search_index.db) that stores:
- Indexed message metadata for fast searching
- Full-text search index (FTS5)
- Message embeddings for semantic search
- Sync state tracking
The search index is stored at ~/.local/share/jons-mcp-imessage/search_index.db
and is separate from the read-only chat.db.
"""
import os
import sqlite3
from collections.abc import Callable, Generator
from contextlib import contextmanager
# Current schema version - increment when making breaking schema changes
CURRENT_SCHEMA_VERSION = 1
# Alias for external imports
SCHEMA_VERSION = CURRENT_SCHEMA_VERSION
# Default path for search index database
SEARCH_INDEX_DIR = os.path.expanduser("~/.local/share/jons-mcp-imessage")
SEARCH_INDEX_PATH = os.path.join(SEARCH_INDEX_DIR, "search_index.db")
def get_search_index_path() -> str:
"""Get the path to the search index database, expanding ~ if needed.
Returns:
Absolute path to search_index.db
"""
return os.path.expanduser(SEARCH_INDEX_PATH)
@contextmanager
def get_search_index_connection(
path: str | None = None,
timeout: float = 30.0,
) -> Generator[sqlite3.Connection, None, None]:
"""Get a read-write connection to the search index database.
Creates the database directory and initializes schema if needed.
Args:
path: Optional custom path to search index database.
If None, uses default path.
timeout: Busy timeout in seconds (how long to wait if DB is locked).
Yields:
SQLite connection configured for read-write access with optimizations.
"""
db_path = path if path else get_search_index_path()
# Ensure directory exists
os.makedirs(os.path.dirname(db_path), exist_ok=True)
# Check if this is first-time creation
is_new_db = not os.path.exists(db_path)
try:
conn = sqlite3.connect(
db_path,
timeout=timeout,
check_same_thread=False,
)
# Enable row factory for dict-like access
conn.row_factory = sqlite3.Row
# Set performance and reliability PRAGMAs
conn.execute("PRAGMA journal_mode = WAL") # Write-Ahead Logging for concurrency
conn.execute("PRAGMA synchronous = NORMAL") # Balance safety and performance
conn.execute("PRAGMA cache_size = -64000") # 64MB cache (negative = KB)
conn.execute("PRAGMA temp_store = MEMORY") # Keep temp tables in memory
conn.execute("PRAGMA mmap_size = 268435456") # 256MB memory-mapped I/O
conn.execute(f"PRAGMA busy_timeout = {int(timeout * 1000)}")
# Initialize schema if this is a new database
if is_new_db:
ensure_schema(conn)
yield conn
finally:
if "conn" in locals():
conn.close()
def ensure_schema(conn: sqlite3.Connection) -> None:
"""Ensure the search index database schema exists and is up-to-date.
Creates all tables and indexes if they don't already exist.
Applies any necessary schema migrations.
Safe to call multiple times (uses IF NOT EXISTS).
Args:
conn: SQLite connection to the search index database.
"""
# Create main message index table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS message_index (
rowid INTEGER PRIMARY KEY, -- Same as message.ROWID in chat.db
text TEXT, -- Message text content
handle_id INTEGER, -- Handle ID (foreign key to chat.db)
sender TEXT, -- Normalized sender identifier
chat_id INTEGER, -- Chat ID (foreign key to chat.db)
chat_identifier TEXT, -- Chat identifier (phone/email)
is_group INTEGER, -- 1 if group chat, 0 otherwise
is_from_me INTEGER, -- 1 if sent by me, 0 if received
service TEXT, -- 'iMessage' or 'SMS'
date_coredata INTEGER, -- CoreData timestamp
embedding BLOB -- Numpy array stored as bytes (optional)
)
"""
)
# Create composite indexes for common query patterns
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_message_index_chat_date
ON message_index(chat_id, date_coredata)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_message_index_service_date
ON message_index(service, date_coredata)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_message_index_sender
ON message_index(sender)
"""
)
# Create FTS5 virtual table for full-text search
# Using external content table to avoid duplicating text
# unicode61 tokenizer handles accents and case-folding
conn.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS message_fts USING fts5(
text,
content='message_index',
content_rowid='rowid',
tokenize='unicode61 remove_diacritics 1'
)
"""
)
# Create triggers to keep FTS index in sync with message_index
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS message_fts_insert
AFTER INSERT ON message_index BEGIN
INSERT INTO message_fts(rowid, text)
VALUES (new.rowid, new.text);
END
"""
)
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS message_fts_delete
AFTER DELETE ON message_index BEGIN
INSERT INTO message_fts(message_fts, rowid, text)
VALUES('delete', old.rowid, old.text);
END
"""
)
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS message_fts_update
AFTER UPDATE ON message_index BEGIN
INSERT INTO message_fts(message_fts, rowid, text)
VALUES('delete', old.rowid, old.text);
INSERT INTO message_fts(rowid, text)
VALUES (new.rowid, new.text);
END
"""
)
# Create message_participants junction table for group chat filtering
# This allows efficient queries like "messages where participant X is in the chat"
conn.execute(
"""
CREATE TABLE IF NOT EXISTS message_participants (
rowid INTEGER NOT NULL, -- Message rowid
participant TEXT NOT NULL, -- Normalized participant identifier
PRIMARY KEY (rowid, participant)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_message_participants_participant
ON message_participants(participant)
"""
)
# Create sync_metadata table for tracking sync state
# This is a key-value store for various metadata
conn.execute(
"""
CREATE TABLE IF NOT EXISTS sync_metadata (
key TEXT PRIMARY KEY,
value TEXT
)
"""
)
# Initialize schema version
conn.execute(
"""
INSERT OR IGNORE INTO sync_metadata (key, value)
VALUES ('schema_version', ?)
""",
(str(CURRENT_SCHEMA_VERSION),),
)
# Initialize other sync tracking fields
conn.execute(
"""
INSERT OR IGNORE INTO sync_metadata (key, value)
VALUES ('last_indexed_rowid', '0')
"""
)
conn.execute(
"""
INSERT OR IGNORE INTO sync_metadata (key, value)
VALUES ('last_embedded_rowid', '0')
"""
)
conn.execute(
"""
INSERT OR IGNORE INTO sync_metadata (key, value)
VALUES ('last_sync_timestamp', '')
"""
)
conn.execute(
"""
INSERT OR IGNORE INTO sync_metadata (key, value)
VALUES ('last_reconciled', '')
"""
)
conn.commit()
# Apply any necessary schema migrations
migrate_schema(conn)
def get_schema_version(conn: sqlite3.Connection) -> int:
"""Get the current schema version from the database.
Args:
conn: SQLite connection to the search index database.
Returns:
Current schema version as an integer. Returns 0 if not set (for new databases).
"""
version_str = get_sync_metadata(conn, "schema_version")
return int(version_str) if version_str else 0
def set_schema_version(conn: sqlite3.Connection, version: int) -> None:
"""Set the schema version in the database.
Args:
conn: SQLite connection to the search index database.
version: Schema version to set.
"""
set_sync_metadata(conn, "schema_version", str(version))
# Migration functions registry - maps version number to migration function
_MIGRATIONS: dict[int, Callable[[sqlite3.Connection], None]] = {}
def _register_migration(version: int) -> Callable:
"""Decorator to register a migration function for a specific schema version.
Migrations should be cumulative functions that upgrade the schema from
(version - 1) to the target version.
Args:
version: Target schema version after this migration is applied.
"""
def decorator(
func: Callable[[sqlite3.Connection], None],
) -> Callable[[sqlite3.Connection], None]:
_MIGRATIONS[version] = func
return func
return decorator
def migrate_schema(conn: sqlite3.Connection) -> None:
"""Check schema version and apply any necessary migrations.
This function compares the current database schema version against
CURRENT_SCHEMA_VERSION and applies any registered migrations in order.
Safe to call multiple times - migrations are only applied if needed.
Args:
conn: SQLite connection to the search index database.
"""
current_version = get_schema_version(conn)
target_version = CURRENT_SCHEMA_VERSION
if current_version == target_version:
# Schema is up-to-date, nothing to do
return
if current_version > target_version:
# Database schema is newer than what this code understands
raise RuntimeError(
f"Database schema version ({current_version}) is newer than "
f"supported version ({target_version}). "
"Please upgrade to a newer version of this software."
)
# Apply migrations in order from current_version + 1 to target_version
for version in range(current_version + 1, target_version + 1):
if version in _MIGRATIONS:
migration_func = _MIGRATIONS[version]
migration_func(conn)
set_schema_version(conn, version)
def get_sync_metadata(conn: sqlite3.Connection, key: str) -> str | None:
"""Get a value from sync_metadata table.
Args:
conn: SQLite connection to the search index database.
key: Metadata key to retrieve.
Returns:
Value for the key, or None if not found.
"""
cursor = conn.execute("SELECT value FROM sync_metadata WHERE key = ?", (key,))
row = cursor.fetchone()
return row[0] if row else None
def set_sync_metadata(conn: sqlite3.Connection, key: str, value: str) -> None:
"""Set a value in sync_metadata table.
Args:
conn: SQLite connection to the search index database.
key: Metadata key to set.
value: Value to store.
"""
conn.execute(
"INSERT OR REPLACE INTO sync_metadata (key, value) VALUES (?, ?)", (key, value)
)
conn.commit()