"""Incremental sync for iMessage search index.
This module provides ROWID-based incremental synchronization between the
iMessage chat.db and our search index database. It efficiently tracks
which messages have been indexed and only fetches new messages.
The sync process:
1. Fast check: Compare MAX(rowid) between chat.db and index
2. If new messages exist, fetch and parse them
3. Insert into search index with FTS5 support
4. Update sync metadata with last indexed rowid
First-run behavior:
- On first search, detects if index is empty (last_indexed_rowid = 0)
- Indexes only the most recent N messages synchronously (default: 1000)
- Spawns background task to backfill remaining older messages
- Tracks backfill progress in sync_metadata
"""
import logging
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from ..db.connection import get_connection
from ..db.parser import parse_message_text
from ..db.queries import normalize_handle, parse_chat_guid
from ..db.search_index import (
get_search_index_connection,
get_sync_metadata,
set_sync_metadata,
)
from ..exceptions import DatabaseLockedError, SyncError
logger = logging.getLogger(__name__)
@dataclass
class SyncStats:
"""Statistics from a sync operation."""
messages_synced: int
messages_failed: int
last_indexed_rowid: int
status: str # 'success', 'partial', 'stale', 'up_to_date'
@dataclass
class SyncCheck:
"""Result of fast sync check."""
needs_sync: bool
new_count: int
index_max_rowid: int
chat_max_rowid: int
@dataclass
class BackfillStatus:
"""Status of background backfill operation."""
status: str # 'pending', 'in_progress', 'complete', 'error'
backfill_progress_rowid: int # Lowest ROWID that has been backfilled
total_to_backfill: int # Total number of messages to backfill
messages_backfilled: int # Number of messages backfilled so far
error: str | None = None # Error message if status is 'error'
def get_last_indexed_rowid(conn: sqlite3.Connection) -> int:
"""Get the last indexed message ROWID from sync metadata.
Args:
conn: SQLite connection to the search index database.
Returns:
Last indexed ROWID, or 0 if no messages have been indexed.
"""
value = get_sync_metadata(conn, "last_indexed_rowid")
return int(value) if value else 0
def set_last_indexed_rowid(conn: sqlite3.Connection, rowid: int) -> None:
"""Set the last indexed message ROWID in sync metadata.
Args:
conn: SQLite connection to the search index database.
rowid: The ROWID of the last indexed message.
"""
set_sync_metadata(conn, "last_indexed_rowid", str(rowid))
def is_first_run(conn: sqlite3.Connection) -> bool:
"""Check if this is the first run (index is empty).
Args:
conn: SQLite connection to the search index database.
Returns:
True if last_indexed_rowid is 0 or NULL, False otherwise.
"""
last_indexed = get_last_indexed_rowid(conn)
return last_indexed == 0
def get_backfill_status(conn: sqlite3.Connection) -> BackfillStatus:
"""Get the current backfill status from sync metadata.
Args:
conn: SQLite connection to the search index database.
Returns:
BackfillStatus with current backfill state.
"""
status = get_sync_metadata(conn, "backfill_status") or "complete"
progress_rowid = int(get_sync_metadata(conn, "backfill_progress_rowid") or "0")
total_to_backfill = int(get_sync_metadata(conn, "backfill_total") or "0")
messages_backfilled = int(get_sync_metadata(conn, "backfill_count") or "0")
error = get_sync_metadata(conn, "backfill_error")
return BackfillStatus(
status=status,
backfill_progress_rowid=progress_rowid,
total_to_backfill=total_to_backfill,
messages_backfilled=messages_backfilled,
error=error,
)
def set_backfill_status(
conn: sqlite3.Connection,
status: str,
progress_rowid: int | None = None,
total: int | None = None,
count: int | None = None,
error: str | None = None,
) -> None:
"""Update backfill status in sync metadata.
Args:
conn: SQLite connection to the search index database.
status: Backfill status ('pending', 'in_progress', 'complete', 'error').
progress_rowid: Current backfill progress ROWID (optional).
total: Total messages to backfill (optional).
count: Number of messages backfilled so far (optional).
error: Error message if status is 'error' (optional).
"""
set_sync_metadata(conn, "backfill_status", status)
if progress_rowid is not None:
set_sync_metadata(conn, "backfill_progress_rowid", str(progress_rowid))
if total is not None:
set_sync_metadata(conn, "backfill_total", str(total))
if count is not None:
set_sync_metadata(conn, "backfill_count", str(count))
if error is not None:
set_sync_metadata(conn, "backfill_error", error)
elif status != "error":
# Clear error when setting non-error status
set_sync_metadata(conn, "backfill_error", "")
def _get_max_rowid_from_index(conn: sqlite3.Connection) -> int:
"""Get the maximum ROWID from the message_index table.
This is faster than reading sync_metadata when the index has many messages.
Args:
conn: SQLite connection to the search index database.
Returns:
Maximum ROWID in message_index, or 0 if empty.
"""
cursor = conn.execute("SELECT MAX(rowid) FROM message_index")
row = cursor.fetchone()
return row[0] if row[0] is not None else 0
def _get_max_rowid_from_chat(conn: sqlite3.Connection) -> int:
"""Get the maximum ROWID from chat.db message table.
Args:
conn: SQLite connection to chat.db.
Returns:
Maximum ROWID in message table, or 0 if empty.
"""
cursor = conn.execute("SELECT MAX(ROWID) FROM message")
row = cursor.fetchone()
return row[0] if row[0] is not None else 0
def fast_sync_check(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
) -> SyncCheck:
"""Perform a fast check to determine if sync is needed.
Only queries MAX(rowid) from both databases, which is very efficient
even with millions of messages.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
Returns:
SyncCheck with needs_sync flag and count of new messages.
"""
# Get last indexed from metadata (more reliable than MAX in index)
index_max = get_last_indexed_rowid(index_conn)
chat_max = _get_max_rowid_from_chat(chat_conn)
needs_sync = chat_max > index_max
new_count = max(0, chat_max - index_max) if needs_sync else 0
return SyncCheck(
needs_sync=needs_sync,
new_count=new_count,
index_max_rowid=index_max,
chat_max_rowid=chat_max,
)
def _fetch_chat_participants(
chat_conn: sqlite3.Connection,
chat_id: int,
) -> list[str]:
"""Fetch all participants in a chat.
Args:
chat_conn: SQLite connection to chat.db.
chat_id: The chat ROWID.
Returns:
List of normalized participant identifiers.
"""
cursor = chat_conn.execute(
"""
SELECT h.id
FROM handle h
JOIN chat_handle_join chj ON h.ROWID = chj.handle_id
WHERE chj.chat_id = ?
""",
(chat_id,),
)
return [normalize_handle(row[0]) for row in cursor.fetchall()]
def _fetch_new_messages(
chat_conn: sqlite3.Connection,
after_rowid: int,
limit: int | None = None,
) -> list[sqlite3.Row]:
"""Fetch messages newer than the given ROWID.
Args:
chat_conn: SQLite connection to chat.db.
after_rowid: Only fetch messages with ROWID > this value.
limit: Maximum number of messages to fetch (None for all).
Returns:
List of message rows with associated metadata.
"""
query = """
SELECT
m.ROWID as rowid,
m.text,
m.attributedBody,
m.handle_id,
m.is_from_me,
m.service,
m.date as date_coredata,
h.id as sender_id,
c.ROWID as chat_id,
c.guid as chat_guid,
c.chat_identifier
FROM message m
LEFT JOIN handle h ON m.handle_id = h.ROWID
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
JOIN chat c ON cmj.chat_id = c.ROWID
WHERE m.ROWID > ?
ORDER BY m.ROWID ASC
"""
if limit is not None:
query += f" LIMIT {int(limit)}"
cursor = chat_conn.execute(query, (after_rowid,))
return cursor.fetchall()
def _fetch_recent_messages(
chat_conn: sqlite3.Connection,
limit: int = 1000,
) -> list[sqlite3.Row]:
"""Fetch the most recent messages (for quick initial index).
Args:
chat_conn: SQLite connection to chat.db.
limit: Number of recent messages to fetch.
Returns:
List of message rows with associated metadata, ordered by ROWID DESC.
"""
query = """
SELECT
m.ROWID as rowid,
m.text,
m.attributedBody,
m.handle_id,
m.is_from_me,
m.service,
m.date as date_coredata,
h.id as sender_id,
c.ROWID as chat_id,
c.guid as chat_guid,
c.chat_identifier
FROM message m
LEFT JOIN handle h ON m.handle_id = h.ROWID
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
JOIN chat c ON cmj.chat_id = c.ROWID
ORDER BY m.ROWID DESC
LIMIT ?
"""
cursor = chat_conn.execute(query, (limit,))
return cursor.fetchall()
def _fetch_old_messages(
chat_conn: sqlite3.Connection,
before_rowid: int,
limit: int = 1000,
) -> list[sqlite3.Row]:
"""Fetch messages older than the given ROWID (for backfill).
Args:
chat_conn: SQLite connection to chat.db.
before_rowid: Only fetch messages with ROWID < this value.
limit: Maximum number of messages to fetch.
Returns:
List of message rows with associated metadata, ordered by ROWID DESC.
"""
query = """
SELECT
m.ROWID as rowid,
m.text,
m.attributedBody,
m.handle_id,
m.is_from_me,
m.service,
m.date as date_coredata,
h.id as sender_id,
c.ROWID as chat_id,
c.guid as chat_guid,
c.chat_identifier
FROM message m
LEFT JOIN handle h ON m.handle_id = h.ROWID
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
JOIN chat c ON cmj.chat_id = c.ROWID
WHERE m.ROWID < ?
ORDER BY m.ROWID DESC
LIMIT ?
"""
cursor = chat_conn.execute(query, (before_rowid, limit))
return cursor.fetchall()
def _index_messages_batch(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
messages: list[sqlite3.Row],
batch_size: int = 1000,
) -> tuple[int, int]:
"""Index a batch of messages into the search index.
This is a helper function used by both sync and backfill operations.
It gracefully handles individual message failures and continues processing.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
messages: List of message rows to index.
batch_size: Number of messages to commit in each batch.
Returns:
Tuple of (synced_count, failed_count).
Raises:
SyncError: If database operations fail persistently.
"""
synced_count = 0
failed_count = 0
# Cache for chat participants (chat_id -> participants list)
participants_cache: dict[int, list[str]] = {}
for msg in messages:
try:
# Parse message text
try:
text = parse_message_text(msg)
except Exception as e:
logger.warning(f"Failed to parse text for message {msg['rowid']}: {e}")
failed_count += 1
continue
# Skip messages without text content (media-only, etc.)
if text is None:
text = ""
# Get sender
try:
sender = msg["sender_id"]
if sender:
sender = normalize_handle(sender)
elif msg["is_from_me"]:
sender = "me"
else:
sender = None
except Exception as e:
logger.warning(f"Failed to determine sender for message {msg['rowid']}: {e}")
sender = None
# Parse chat GUID to determine if group
try:
chat_guid = msg["chat_guid"]
_, is_group, _ = parse_chat_guid(chat_guid) if chat_guid else (None, False, None)
except Exception as e:
logger.warning(f"Failed to parse chat GUID for message {msg['rowid']}: {e}")
is_group = False
# Get chat participants (cached)
try:
chat_id = msg["chat_id"]
if chat_id not in participants_cache:
try:
participants_cache[chat_id] = _fetch_chat_participants(
chat_conn, chat_id
)
except Exception as e:
logger.warning(f"Failed to fetch participants for chat {chat_id}: {e}")
participants_cache[chat_id] = []
participants = participants_cache[chat_id]
except Exception as e:
logger.warning(f"Failed to get participants for message {msg['rowid']}: {e}")
participants = []
# Insert into message_index
try:
index_conn.execute(
"""
INSERT OR REPLACE INTO message_index (
rowid, text, handle_id, sender, chat_id, chat_identifier,
is_group, is_from_me, service, date_coredata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
msg["rowid"],
text,
msg["handle_id"],
sender,
chat_id,
msg["chat_identifier"],
1 if is_group else 0,
msg["is_from_me"],
msg["service"],
msg["date_coredata"],
),
)
except sqlite3.IntegrityError as e:
logger.warning(f"Integrity error inserting message {msg['rowid']}: {e}")
failed_count += 1
continue
except sqlite3.DatabaseError as e:
logger.error(f"Database error inserting message {msg['rowid']}: {e}")
raise SyncError(
f"Database error while indexing messages: {e}",
is_retryable=True,
)
# Insert participants into junction table
try:
for participant in participants:
index_conn.execute(
"""
INSERT OR IGNORE INTO message_participants (rowid, participant)
VALUES (?, ?)
""",
(msg["rowid"], participant),
)
except sqlite3.DatabaseError as e:
logger.warning(f"Failed to insert participants for message {msg['rowid']}: {e}")
# Don't fail the whole sync for participant insertion failures
synced_count += 1
# Commit in batches
if synced_count % batch_size == 0:
try:
index_conn.commit()
logger.debug(f"Indexed batch: {synced_count} messages")
except sqlite3.DatabaseError as e:
logger.error(f"Database error committing batch: {e}")
raise SyncError(
f"Failed to commit message batch: {e}",
is_retryable=True,
)
except SyncError:
# Re-raise SyncError as-is
raise
except Exception as e:
# Catch any other unexpected errors
failed_count += 1
logger.warning(f"Failed to index message {msg['rowid']}: {e}")
continue
# Final commit
if synced_count > 0:
try:
index_conn.commit()
except sqlite3.DatabaseError as e:
logger.error(f"Database error during final commit: {e}")
raise SyncError(
f"Failed to commit indexed messages: {e}",
is_retryable=True,
)
logger.info(f"Indexed batch complete: synced={synced_count}, failed={failed_count}")
return synced_count, failed_count
def sync_new_messages(
index_conn: sqlite3.Connection,
chat_conn: sqlite3.Connection,
limit: int | None = None,
batch_size: int = 1000,
) -> SyncStats:
"""Sync new messages from chat.db to the search index.
This function fetches messages with ROWID greater than the last indexed
ROWID, parses their text content, extracts metadata, and inserts them
into the search index. It handles partial failures gracefully.
Args:
index_conn: SQLite connection to the search index database.
chat_conn: SQLite connection to chat.db.
limit: Maximum number of messages to sync (None for all new).
batch_size: Number of messages to commit in each batch.
Returns:
SyncStats with counts and status ('success', 'partial', 'failed', 'up_to_date').
Raises:
SyncError: If a critical error prevents sync operation.
DatabaseLockedError: If database is locked.
"""
try:
last_indexed = get_last_indexed_rowid(index_conn)
logger.debug(f"Syncing messages after rowid {last_indexed}")
except Exception as e:
logger.error(f"Failed to get last indexed rowid: {e}")
raise SyncError(
f"Failed to read sync metadata: {e}",
is_retryable=True,
)
try:
messages = _fetch_new_messages(chat_conn, last_indexed, limit)
except sqlite3.DatabaseError as e:
logger.error(f"Failed to fetch new messages: {e}")
raise SyncError(
f"Failed to fetch new messages from chat.db: {e}",
is_retryable=True,
)
except Exception as e:
logger.error(f"Unexpected error fetching new messages: {e}")
raise SyncError(
f"Unexpected error while fetching messages: {e}",
is_retryable=False,
)
if not messages:
logger.debug("No new messages to sync")
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=last_indexed,
status="up_to_date",
)
logger.info(f"Syncing {len(messages)} new messages")
# Index messages using helper function
try:
synced_count, failed_count = _index_messages_batch(
index_conn, chat_conn, messages, batch_size
)
except SyncError:
# Re-raise SyncError as-is
raise
except Exception as e:
logger.error(f"Unexpected error during message indexing: {e}")
raise SyncError(
f"Unexpected error during sync: {e}",
is_retryable=False,
)
# Update last_indexed_rowid to the max successfully indexed rowid
max_successful_rowid = last_indexed
if synced_count > 0:
# Find the max rowid from indexed messages
try:
for msg in messages:
if msg["rowid"] > max_successful_rowid:
max_successful_rowid = msg["rowid"]
set_last_indexed_rowid(index_conn, max_successful_rowid)
logger.debug(f"Updated last indexed rowid to {max_successful_rowid}")
except Exception as e:
logger.warning(f"Failed to update last indexed rowid: {e}")
# Don't fail the whole sync for metadata updates
# Update last sync timestamp
try:
set_sync_metadata(
index_conn,
"last_sync_timestamp",
datetime.now(timezone.utc).isoformat(),
)
except Exception as e:
logger.warning(f"Failed to update last sync timestamp: {e}")
# Don't fail the whole sync for metadata updates
# Determine status
if failed_count == 0 and synced_count > 0:
status = "success"
elif synced_count > 0 and failed_count > 0:
status = "partial"
elif synced_count == 0 and failed_count > 0:
status = "failed"
else:
status = "up_to_date"
logger.info(
f"Sync complete: synced={synced_count}, failed={failed_count}, status={status}"
)
return SyncStats(
messages_synced=synced_count,
messages_failed=failed_count,
last_indexed_rowid=max_successful_rowid,
status=status,
)
def sync_messages(
index_path: str | None = None,
limit: int | None = None,
batch_size: int = 1000,
) -> SyncStats:
"""High-level sync function with automatic connection management.
This is the main entry point for syncing messages. It handles:
- Opening connections to both databases
- Performing fast sync check
- Syncing new messages if needed
- Graceful handling of database lock errors
Args:
index_path: Optional custom path to search index database.
limit: Maximum number of messages to sync (None for all new).
batch_size: Number of messages to commit in each batch.
Returns:
SyncStats with sync results.
Raises:
DatabaseLockedError is handled gracefully by returning 'stale' status.
SyncError is logged but doesn't prevent operation.
"""
try:
with get_search_index_connection(path=index_path) as index_conn:
try:
with get_connection() as chat_conn:
# Fast check first
try:
check = fast_sync_check(index_conn, chat_conn)
except Exception as e:
logger.warning(f"Fast sync check failed: {e}")
# Return stale status if check fails
last_indexed = get_last_indexed_rowid(index_conn)
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=last_indexed,
status="stale",
)
if not check.needs_sync:
logger.debug("Index is up to date, no sync needed")
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=check.index_max_rowid,
status="up_to_date",
)
logger.info(
f"Sync needed: {check.new_count} new messages "
f"(index: {check.index_max_rowid}, chat: {check.chat_max_rowid})"
)
# Perform sync
try:
return sync_new_messages(
index_conn,
chat_conn,
limit=limit,
batch_size=batch_size,
)
except SyncError as e:
logger.error(f"Sync error: {e}")
# Return partial status if sync fails
last_indexed = get_last_indexed_rowid(index_conn)
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=last_indexed,
status="stale",
)
except DatabaseLockedError:
# Return stale status - search can still use existing index
logger.warning(
"chat.db is locked, returning stale status. "
"Search will use existing index."
)
last_indexed = get_last_indexed_rowid(index_conn)
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=last_indexed,
status="stale",
)
except Exception as e:
logger.error(f"Unexpected error during sync: {e}")
# Return failed status on critical errors
try:
with get_search_index_connection(path=index_path) as index_conn:
last_indexed = get_last_indexed_rowid(index_conn)
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=last_indexed,
status="stale",
)
except Exception as e2:
logger.error(f"Failed to get last indexed rowid: {e2}")
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=0,
status="stale",
)
def quick_initial_index(
index_path: str | None = None,
limit: int = 1000,
batch_size: int = 1000,
) -> SyncStats:
"""Perform a quick initial index of the most recent messages.
This function is called on first-run to quickly index the most recent
messages so that search results are available immediately. After this
completes, start_background_backfill() should be called to index the
remaining older messages in the background.
Args:
index_path: Optional custom path to search index database.
limit: Number of recent messages to index (default: 1000).
batch_size: Number of messages to commit in each batch.
Returns:
SyncStats with indexing results.
"""
with get_search_index_connection(path=index_path) as index_conn:
try:
with get_connection() as chat_conn:
# Fetch recent messages
messages = _fetch_recent_messages(chat_conn, limit)
if not messages:
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=0,
status="up_to_date",
)
# Index messages using helper function
synced_count, failed_count = _index_messages_batch(
index_conn, chat_conn, messages, batch_size
)
# Update last_indexed_rowid to the max rowid from indexed messages
max_rowid = max(msg["rowid"] for msg in messages) if messages else 0
min_rowid = min(msg["rowid"] for msg in messages) if messages else 0
if synced_count > 0:
set_last_indexed_rowid(index_conn, max_rowid)
# Set backfill status to pending
# We need to backfill all messages with ROWID < min_rowid
set_backfill_status(
conn=index_conn,
status="pending",
progress_rowid=min_rowid,
total=min_rowid - 1, # Approximate total to backfill
count=0,
)
logger.info(
f"Quick initial index complete: {synced_count} messages indexed "
f"(rowid range: {min_rowid}-{max_rowid}). "
f"Backfill needed for {min_rowid - 1} older messages."
)
# Update last sync timestamp
set_sync_metadata(
index_conn,
"last_sync_timestamp",
datetime.now(timezone.utc).isoformat(),
)
# Determine status
if failed_count == 0 and synced_count > 0:
status = "success"
elif synced_count > 0 and failed_count > 0:
status = "partial"
elif synced_count == 0 and failed_count > 0:
status = "failed"
else:
status = "up_to_date"
return SyncStats(
messages_synced=synced_count,
messages_failed=failed_count,
last_indexed_rowid=max_rowid,
status=status,
)
except DatabaseLockedError:
logger.warning("chat.db is locked during quick initial index")
return SyncStats(
messages_synced=0,
messages_failed=0,
last_indexed_rowid=0,
status="stale",
)
# Global state for background backfill tracking
_backfill_state: dict[str, Any] = {
"running": False,
"thread": None,
}
_backfill_lock = threading.Lock()
def _background_backfill_worker(index_path: str | None = None) -> None:
"""Worker function that performs the background backfill.
This runs in a background thread to index older messages that were
skipped during the quick initial index.
"""
try:
with get_search_index_connection(path=index_path) as index_conn:
# Get backfill status
backfill_status = get_backfill_status(index_conn)
if backfill_status.status == "complete":
logger.info("Backfill already complete, nothing to do")
return
# Set status to in_progress
set_backfill_status(index_conn, "in_progress")
# Get the current backfill progress rowid
progress_rowid = backfill_status.backfill_progress_rowid
total_backfilled = backfill_status.messages_backfilled
logger.info(
f"Starting background backfill from rowid {progress_rowid} "
f"(already backfilled: {total_backfilled})"
)
batch_size = 1000
batches_processed = 0
with get_connection() as chat_conn:
while progress_rowid > 0:
# Fetch a batch of old messages
messages = _fetch_old_messages(chat_conn, progress_rowid, batch_size)
if not messages:
# No more messages to backfill
break
# Index the batch
synced_count, failed_count = _index_messages_batch(
index_conn, chat_conn, messages, batch_size
)
total_backfilled += synced_count
batches_processed += 1
# Update progress
min_rowid = min(msg["rowid"] for msg in messages) if messages else 0
progress_rowid = min_rowid
set_backfill_status(
conn=index_conn,
status="in_progress",
progress_rowid=progress_rowid,
count=total_backfilled,
)
logger.debug(
f"Backfill batch {batches_processed}: "
f"{synced_count} messages indexed, "
f"progress_rowid: {progress_rowid}"
)
# If we got fewer messages than the batch size, we're done
if len(messages) < batch_size:
break
# Mark backfill as complete
set_backfill_status(index_conn, "complete", count=total_backfilled)
logger.info(
f"Background backfill complete: {total_backfilled} total messages backfilled"
)
except Exception as e:
logger.error(f"Background backfill error: {e}")
try:
with get_search_index_connection(path=index_path) as index_conn:
set_backfill_status(index_conn, "error", error=str(e))
except Exception:
pass
finally:
with _backfill_lock:
_backfill_state["running"] = False
_backfill_state["thread"] = None
def start_background_backfill(index_path: str | None = None) -> dict:
"""Start a background thread to backfill older messages.
This should be called after quick_initial_index() completes to index
the remaining older messages that were skipped.
Args:
index_path: Optional custom path to search index database.
Returns:
Dictionary with backfill status.
"""
with _backfill_lock:
if _backfill_state["running"]:
return {
"status": "already_running",
"message": "Background backfill already in progress",
}
# Check if backfill is needed
with get_search_index_connection(path=index_path) as index_conn:
backfill_status = get_backfill_status(index_conn)
if backfill_status.status == "complete":
return {
"status": "complete",
"message": "Backfill already complete",
}
# Start background thread
_backfill_state["running"] = True
thread = threading.Thread(
target=_background_backfill_worker,
args=(index_path,),
daemon=True,
)
thread.start()
_backfill_state["thread"] = thread
return {
"status": "started",
"message": "Background backfill started",
"note": "Use get_backfill_status() to check progress",
}