We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/jonmmease/jons-mcp-imessage'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Search index management tools for iMessage search."""
import os
import threading
from datetime import datetime
from typing import Any, Optional
from ..db import (
get_connection,
get_search_index_connection,
get_search_index_path,
get_sync_metadata,
set_sync_metadata,
parse_message_text,
normalize_handle,
CURRENT_SCHEMA_VERSION,
)
async def search_index_status() -> dict:
"""Get the current status of the search index.
Returns information about the search index including:
- Total number of indexed messages
- Total number of messages with embeddings
- Last indexed message rowid
- Last embedded message rowid
- Schema version
- Database path and size
- Backfill status (if applicable)
Returns:
Dictionary with search index status and metadata.
"""
from ..search.sync import get_backfill_status
index_path = get_search_index_path()
# Get database size if it exists
database_size_mb = 0.0
if os.path.exists(index_path):
database_size_mb = os.path.getsize(index_path) / (1024 * 1024)
with get_search_index_connection() as conn:
# Get total indexed count
cursor = conn.execute("SELECT COUNT(*) FROM message_index")
total_indexed = cursor.fetchone()[0]
# Get total embedded count (non-null embeddings)
cursor = conn.execute(
"SELECT COUNT(*) FROM message_index WHERE embedding IS NOT NULL"
)
total_embedded = cursor.fetchone()[0]
# Get sync metadata
last_indexed_rowid = int(get_sync_metadata(conn, "last_indexed_rowid") or "0")
last_embedded_rowid = int(get_sync_metadata(conn, "last_embedded_rowid") or "0")
schema_version = int(get_sync_metadata(conn, "schema_version") or "0")
last_sync_timestamp = get_sync_metadata(conn, "last_sync_timestamp") or ""
# Get backfill status
backfill_status = get_backfill_status(conn)
result = {
"total_indexed": total_indexed,
"total_embedded": total_embedded,
"last_indexed_rowid": last_indexed_rowid,
"last_embedded_rowid": last_embedded_rowid,
"schema_version": schema_version,
"current_schema_version": CURRENT_SCHEMA_VERSION,
"last_sync_timestamp": last_sync_timestamp,
"database_path": index_path,
"database_size_mb": round(database_size_mb, 2),
}
# Include backfill status if it's not complete
if backfill_status.status != "complete":
result["backfill"] = {
"status": backfill_status.status,
"progress_rowid": backfill_status.backfill_progress_rowid,
"total_to_backfill": backfill_status.total_to_backfill,
"messages_backfilled": backfill_status.messages_backfilled,
}
if backfill_status.error:
result["backfill"]["error"] = backfill_status.error
return result
# Global state for background rebuild tracking
_rebuild_state: dict[str, Any] = {
"in_progress": False,
"total_messages": 0,
"indexed_messages": 0,
"start_time": None,
"error": None,
}
_rebuild_lock = threading.Lock()
def _rebuild_index_worker() -> None:
"""Worker function that performs the actual index rebuild.
This runs in a background thread when background=True.
Updates _rebuild_state as it progresses.
"""
global _rebuild_state
try:
with _rebuild_lock:
_rebuild_state["start_time"] = datetime.now().isoformat()
_rebuild_state["error"] = None
# Clear existing index
with get_search_index_connection() as index_conn:
index_conn.execute("DELETE FROM message_index")
index_conn.execute("DELETE FROM message_participants")
set_sync_metadata(index_conn, "last_indexed_rowid", "0")
set_sync_metadata(index_conn, "last_embedded_rowid", "0")
# Reset backfill status since we're doing a full rebuild
set_sync_metadata(index_conn, "backfill_status", "complete")
set_sync_metadata(index_conn, "backfill_progress_rowid", "0")
set_sync_metadata(index_conn, "backfill_total", "0")
set_sync_metadata(index_conn, "backfill_count", "0")
set_sync_metadata(index_conn, "backfill_error", "")
# Get total message count from chat.db
with get_connection() as chat_conn:
cursor = chat_conn.execute("SELECT COUNT(*) FROM message")
total = cursor.fetchone()[0]
with _rebuild_lock:
_rebuild_state["total_messages"] = total
# Index messages in batches
batch_size = 1000
offset = 0
while True:
with get_connection() as chat_conn:
cursor = chat_conn.execute(
"""
SELECT
m.ROWID,
m.text,
m.attributedBody,
m.handle_id,
m.is_from_me,
m.service,
m.date,
h.id as sender_handle,
c.ROWID as chat_id,
c.chat_identifier,
c.display_name
FROM message m
LEFT JOIN handle h ON m.handle_id = h.ROWID
LEFT JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
LEFT JOIN chat c ON cmj.chat_id = c.ROWID
ORDER BY m.ROWID
LIMIT ? OFFSET ?
""",
(batch_size, offset),
)
rows = cursor.fetchall()
if not rows:
break
# Insert batch into search index
with get_search_index_connection() as index_conn:
for row in rows:
text = parse_message_text(row)
sender = (
normalize_handle(row["sender_handle"])
if row["sender_handle"]
else None
)
index_conn.execute(
"""
INSERT OR REPLACE INTO message_index
(rowid, text, handle_id, sender, chat_id, chat_identifier,
is_group, is_from_me, service, date_coredata, embedding)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)
""",
(
row["ROWID"],
text,
row["handle_id"],
sender,
row["chat_id"],
row["chat_identifier"],
0, # is_group - would need to query chat table
row["is_from_me"],
row["service"],
row["date"],
),
)
index_conn.commit()
offset += batch_size
with _rebuild_lock:
_rebuild_state["indexed_messages"] = offset
# Update sync metadata
with get_search_index_connection() as index_conn:
# Get the highest rowid we indexed
cursor = index_conn.execute("SELECT MAX(rowid) FROM message_index")
max_rowid = cursor.fetchone()[0] or 0
set_sync_metadata(index_conn, "last_indexed_rowid", str(max_rowid))
set_sync_metadata(
index_conn, "last_sync_timestamp", datetime.now().isoformat()
)
with _rebuild_lock:
_rebuild_state["in_progress"] = False
except Exception as e:
with _rebuild_lock:
_rebuild_state["error"] = str(e)
_rebuild_state["in_progress"] = False
async def rebuild_search_index(background: bool = False) -> dict:
"""Rebuild the entire search index from scratch.
This will:
1. Clear the existing search index
2. Re-index all messages from chat.db
3. Update sync metadata
Note: This does NOT generate embeddings. Use a separate tool for that.
Args:
background: If True, runs the rebuild in a background thread and returns
immediately with progress tracking info. If False, blocks until
complete.
Returns:
Dictionary with rebuild status and progress information.
"""
global _rebuild_state
# Check if rebuild is already in progress
with _rebuild_lock:
if _rebuild_state["in_progress"]:
return {
"status": "in_progress",
"message": "Rebuild already in progress",
"progress": {
"total_messages": _rebuild_state["total_messages"],
"indexed_messages": _rebuild_state["indexed_messages"],
"start_time": _rebuild_state["start_time"],
},
}
# Mark as in progress
_rebuild_state["in_progress"] = True
_rebuild_state["total_messages"] = 0
_rebuild_state["indexed_messages"] = 0
_rebuild_state["error"] = None
if background:
# Start background thread
thread = threading.Thread(target=_rebuild_index_worker, daemon=True)
thread.start()
return {
"status": "started",
"message": "Index rebuild started in background",
"background": True,
"note": "Use search_index_status() to check progress",
}
else:
# Run synchronously
_rebuild_index_worker()
with _rebuild_lock:
if _rebuild_state["error"]:
return {
"status": "error",
"message": "Index rebuild failed",
"error": _rebuild_state["error"],
}
return {
"status": "completed",
"message": "Index rebuild completed successfully",
"indexed_messages": _rebuild_state["indexed_messages"],
}
async def get_rebuild_progress() -> dict:
"""Get the current progress of a background index rebuild.
Returns:
Dictionary with rebuild progress information, or status indicating
no rebuild is in progress.
"""
with _rebuild_lock:
if not _rebuild_state["in_progress"]:
return {
"status": "idle",
"message": "No rebuild in progress",
}
progress_pct = 0.0
total = _rebuild_state["total_messages"]
indexed = _rebuild_state["indexed_messages"]
if isinstance(total, int) and total > 0 and isinstance(indexed, int):
progress_pct = (indexed / total) * 100
return {
"status": "in_progress",
"total_messages": _rebuild_state["total_messages"],
"indexed_messages": _rebuild_state["indexed_messages"],
"progress_percent": round(progress_pct, 2),
"start_time": _rebuild_state["start_time"],
"error": _rebuild_state["error"],
}
async def get_backfill_progress() -> dict:
"""Get the current progress of the background backfill operation.
This returns information about the background backfill that runs after
the quick initial index on first-run. The backfill indexes older messages
that were skipped during the quick initial index.
Returns:
Dictionary with backfill progress information including:
- status: 'pending', 'in_progress', 'complete', or 'error'
- backfill_progress_rowid: Lowest ROWID that has been backfilled
- total_to_backfill: Total number of messages to backfill
- messages_backfilled: Number of messages backfilled so far
- progress_percent: Percentage complete (if in progress)
- error: Error message (if status is 'error')
"""
from ..search.sync import get_backfill_status
with get_search_index_connection() as conn:
backfill_status = get_backfill_status(conn)
result = {
"status": backfill_status.status,
"backfill_progress_rowid": backfill_status.backfill_progress_rowid,
"total_to_backfill": backfill_status.total_to_backfill,
"messages_backfilled": backfill_status.messages_backfilled,
}
# Calculate progress percentage if in progress
if backfill_status.status == "in_progress" and backfill_status.total_to_backfill > 0:
progress_pct = (
backfill_status.messages_backfilled / backfill_status.total_to_backfill
) * 100
result["progress_percent"] = round(progress_pct, 2)
# Include error if present
if backfill_status.error:
result["error"] = backfill_status.error
return result