"""Reconciliation logic for maintaining search index consistency.
This module handles:
- Detecting and removing orphaned messages (deleted from chat.db)
- Detecting and re-indexing edited messages
- Periodic reconciliation scheduling
The reconciliation process ensures that the search index stays consistent
with the actual state of chat.db by removing deleted messages and updating
edited ones.
"""
import logging
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from ..db.connection import get_connection
from ..db.parser import parse_message_text
from ..db.queries import normalize_handle, parse_chat_guid
from ..db.search_index import (
get_search_index_connection,
get_sync_metadata,
set_sync_metadata,
)
logger = logging.getLogger(__name__)
@dataclass
class ReconciliationStats:
"""Statistics from a reconciliation operation."""
orphaned_deleted: int
messages_updated: int
messages_failed: int
status: str # 'success', 'partial', 'failed', 'skipped'
def get_last_reconciled(conn: sqlite3.Connection) -> datetime | None:
"""Get the timestamp of the last reconciliation.
Args:
conn: SQLite connection to the search index database.
Returns:
Datetime of last reconciliation, or None if never reconciled.
"""
value = get_sync_metadata(conn, "last_reconciled")
if value:
try:
return datetime.fromisoformat(value)
except ValueError:
logger.warning(f"Invalid last_reconciled timestamp: {value}")
return None
def set_last_reconciled(conn: sqlite3.Connection, timestamp: datetime) -> None:
"""Set the timestamp of the last reconciliation.
Args:
conn: SQLite connection to the search index database.
timestamp: The datetime to record.
"""
set_sync_metadata(conn, "last_reconciled", timestamp.isoformat())
def should_reconcile(
conn: sqlite3.Connection,
interval_hours: int = 24,
) -> bool:
"""Check if reconciliation is needed based on the configured interval.
Args:
conn: SQLite connection to the search index database.
interval_hours: Minimum hours between reconciliation runs.
Returns:
True if reconciliation should run, False otherwise.
"""
last_reconciled = get_last_reconciled(conn)
if last_reconciled is None:
return True
now = datetime.now(timezone.utc)
# Handle both timezone-aware and naive datetimes
if last_reconciled.tzinfo is None:
last_reconciled = last_reconciled.replace(tzinfo=timezone.utc)
elapsed = now - last_reconciled
threshold = timedelta(hours=interval_hours)
return elapsed >= threshold
def find_orphaned_messages(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
batch_size: int = 1000,
) -> list[int]:
"""Find messages in the index that no longer exist in chat.db.
This uses a batched LEFT JOIN approach to identify orphaned ROWIDs.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
batch_size: Number of rows to check per batch.
Returns:
List of orphaned message ROWIDs.
"""
orphaned: list[int] = []
# Get all indexed rowids
cursor = index_conn.execute("SELECT rowid FROM message_index ORDER BY rowid")
indexed_rowids = [row[0] for row in cursor.fetchall()]
if not indexed_rowids:
return orphaned
# Check in batches by querying chat.db for existence
for i in range(0, len(indexed_rowids), batch_size):
batch = indexed_rowids[i : i + batch_size]
placeholders = ",".join("?" * len(batch))
# Query chat.db to see which rowids exist
query = f"SELECT ROWID FROM message WHERE ROWID IN ({placeholders})"
cursor = chat_conn.execute(query, batch)
existing = {row[0] for row in cursor.fetchall()}
# Any rowid in batch but not in existing is orphaned
batch_orphaned = [rowid for rowid in batch if rowid not in existing]
orphaned.extend(batch_orphaned)
if batch_orphaned:
logger.debug(
f"Found {len(batch_orphaned)} orphaned messages in batch "
f"{i // batch_size + 1}"
)
return orphaned
def delete_orphaned_messages(
conn: sqlite3.Connection,
rowids: list[int],
) -> int:
"""Delete orphaned messages from the search index.
Removes entries from message_index, message_fts (via trigger),
and message_participants.
Args:
conn: SQLite connection to the search index database.
rowids: List of message ROWIDs to delete.
Returns:
Number of messages successfully deleted.
"""
if not rowids:
return 0
deleted_count = 0
for rowid in rowids:
try:
# Delete from message_index (triggers will handle message_fts)
conn.execute("DELETE FROM message_index WHERE rowid = ?", (rowid,))
# Delete from message_participants
conn.execute("DELETE FROM message_participants WHERE rowid = ?", (rowid,))
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to delete orphaned message {rowid}: {e}")
conn.commit()
return deleted_count
def _check_date_edited_column(chat_conn: sqlite3.Connection) -> bool:
"""Check if the date_edited column exists in the message table.
Args:
chat_conn: SQLite connection to chat.db.
Returns:
True if date_edited column exists, False otherwise.
"""
cursor = chat_conn.execute("PRAGMA table_info(message)")
columns = [row[1] for row in cursor.fetchall()]
return "date_edited" in columns
def _fetch_chat_participants(
chat_conn: sqlite3.Connection,
chat_id: int,
) -> list[str]:
"""Fetch all participants in a chat.
Args:
chat_conn: SQLite connection to chat.db.
chat_id: The chat ROWID.
Returns:
List of normalized participant identifiers.
"""
cursor = chat_conn.execute(
"""
SELECT h.id
FROM handle h
JOIN chat_handle_join chj ON h.ROWID = chj.handle_id
WHERE chj.chat_id = ?
""",
(chat_id,),
)
return [normalize_handle(row[0]) for row in cursor.fetchall()]
def find_edited_messages(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
batch_size: int = 1000,
) -> list[sqlite3.Row]:
"""Find messages that have been edited since being indexed.
Checks if date_edited column exists, and if so, compares it against
indexed messages to find those that need re-indexing.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
batch_size: Number of rows to check per batch.
Returns:
List of message rows that have been edited.
"""
# Check if date_edited column exists
if not _check_date_edited_column(chat_conn):
logger.debug("date_edited column not found in message table")
return []
edited: list[sqlite3.Row] = []
# Get all indexed rowids
cursor = index_conn.execute("SELECT rowid FROM message_index ORDER BY rowid")
indexed_rowids = [row[0] for row in cursor.fetchall()]
if not indexed_rowids:
return edited
# Check in batches for edited messages
for i in range(0, len(indexed_rowids), batch_size):
batch = indexed_rowids[i : i + batch_size]
placeholders = ",".join("?" * len(batch))
# Find messages where date_edited is not NULL (meaning they were edited)
query = f"""
SELECT
m.ROWID as rowid,
m.text,
m.attributedBody,
m.handle_id,
m.is_from_me,
m.service,
m.date as date_coredata,
m.date_edited,
h.id as sender_id,
c.ROWID as chat_id,
c.guid as chat_guid,
c.chat_identifier
FROM message m
LEFT JOIN handle h ON m.handle_id = h.ROWID
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
JOIN chat c ON cmj.chat_id = c.ROWID
WHERE m.ROWID IN ({placeholders})
AND m.date_edited IS NOT NULL
AND m.date_edited > 0
"""
cursor = chat_conn.execute(query, batch)
batch_edited = cursor.fetchall()
if batch_edited:
edited.extend(batch_edited)
logger.debug(
f"Found {len(batch_edited)} edited messages in batch "
f"{i // batch_size + 1}"
)
return edited
def update_edited_messages(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
messages: list[sqlite3.Row],
) -> tuple[int, int]:
"""Update edited messages in the search index.
Re-indexes messages that have been edited, updating their text content
and metadata.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
messages: List of edited message rows to update.
Returns:
Tuple of (updated_count, failed_count).
"""
if not messages:
return 0, 0
updated_count = 0
failed_count = 0
# Cache for chat participants
participants_cache: dict[int, list[str]] = {}
for msg in messages:
try:
# Parse message text
text = parse_message_text(msg)
if text is None:
text = ""
# Get sender
sender = msg["sender_id"]
if sender:
sender = normalize_handle(sender)
elif msg["is_from_me"]:
sender = "me"
else:
sender = None
# Parse chat GUID to determine if group
chat_guid = msg["chat_guid"]
_, is_group, _ = (
parse_chat_guid(chat_guid) if chat_guid else (None, False, None)
)
# Get chat participants (cached)
chat_id = msg["chat_id"]
if chat_id not in participants_cache:
participants_cache[chat_id] = _fetch_chat_participants(
chat_conn, chat_id
)
participants = participants_cache[chat_id]
# Update message_index (triggers will handle FTS update)
index_conn.execute(
"""
UPDATE message_index
SET text = ?,
handle_id = ?,
sender = ?,
chat_id = ?,
chat_identifier = ?,
is_group = ?,
is_from_me = ?,
service = ?,
date_coredata = ?
WHERE rowid = ?
""",
(
text,
msg["handle_id"],
sender,
chat_id,
msg["chat_identifier"],
1 if is_group else 0,
msg["is_from_me"],
msg["service"],
msg["date_coredata"],
msg["rowid"],
),
)
# Update participants (delete old, insert new)
index_conn.execute(
"DELETE FROM message_participants WHERE rowid = ?", (msg["rowid"],)
)
for participant in participants:
index_conn.execute(
"""
INSERT OR IGNORE INTO message_participants (rowid, participant)
VALUES (?, ?)
""",
(msg["rowid"], participant),
)
updated_count += 1
except Exception as e:
failed_count += 1
logger.warning(f"Failed to update edited message {msg['rowid']}: {e}")
index_conn.commit()
return updated_count, failed_count
def reconcile(
index_path: str | None = None,
interval_hours: int = 24,
force: bool = False,
) -> ReconciliationStats:
"""Main reconciliation function that orchestrates the entire process.
This function:
1. Checks if reconciliation is needed based on interval
2. Finds and deletes orphaned messages
3. Finds and updates edited messages
4. Updates last_reconciled timestamp
Args:
index_path: Optional custom path to search index database.
interval_hours: Minimum hours between reconciliation runs.
force: If True, skip the interval check and run immediately.
Returns:
ReconciliationStats with operation results.
"""
with get_search_index_connection(path=index_path) as index_conn:
# Check if we should run
if not force and not should_reconcile(index_conn, interval_hours):
logger.debug("Reconciliation not needed yet")
return ReconciliationStats(
orphaned_deleted=0,
messages_updated=0,
messages_failed=0,
status="skipped",
)
try:
with get_connection() as chat_conn:
logger.info("Starting reconciliation process")
# Find and delete orphaned messages
orphaned = find_orphaned_messages(index_conn, chat_conn)
orphaned_deleted = 0
if orphaned:
logger.info(f"Found {len(orphaned)} orphaned messages")
orphaned_deleted = delete_orphaned_messages(index_conn, orphaned)
logger.info(f"Deleted {orphaned_deleted} orphaned messages")
# Find and update edited messages
edited = find_edited_messages(index_conn, chat_conn)
messages_updated = 0
messages_failed = 0
if edited:
logger.info(f"Found {len(edited)} edited messages")
messages_updated, messages_failed = update_edited_messages(
index_conn, chat_conn, edited
)
logger.info(
f"Updated {messages_updated} edited messages "
f"({messages_failed} failed)"
)
# Update last_reconciled timestamp
set_last_reconciled(index_conn, datetime.now(timezone.utc))
# Determine status
if messages_failed > 0:
status = "partial"
elif orphaned_deleted > 0 or messages_updated > 0:
status = "success"
else:
status = "success" # Nothing to do is still success
logger.info("Reconciliation complete")
return ReconciliationStats(
orphaned_deleted=orphaned_deleted,
messages_updated=messages_updated,
messages_failed=messages_failed,
status=status,
)
except Exception as e:
logger.error(f"Reconciliation failed: {e}")
return ReconciliationStats(
orphaned_deleted=0,
messages_updated=0,
messages_failed=0,
status="failed",
)