"""Conversation tools for iMessage."""
from typing import Optional
from ..db import (
get_connection,
parse_message_text,
coredata_to_datetime,
normalize_handle,
)
from ..db.contacts import get_contact_cache
from ..db.queries import format_datetime_iso, parse_chat_guid
from ..constants import DEFAULT_PAGINATION_LIMIT, DEFAULT_MESSAGE_LIMIT
from ..utils import apply_pagination, ensure_file_uri
async def list_conversations(
limit: int = DEFAULT_PAGINATION_LIMIT,
offset: int = 0,
) -> dict:
"""List all conversations with metadata.
Returns conversations sorted by most recent message, including:
- Participant information
- Contact names for participants
- Message count
- Last message preview
- Whether it's a group chat
Args:
limit: Maximum number of conversations to return (default: 20)
offset: Number of conversations to skip (default: 0)
Returns:
Dictionary with conversations list and pagination metadata.
"""
with get_connection() as conn:
# Get conversations with message counts and last message info
cursor = conn.execute(
"""
SELECT
c.ROWID as chat_rowid,
c.guid as chat_guid,
c.chat_identifier,
c.display_name,
COUNT(DISTINCT cmj.message_id) as message_count,
MAX(m.date) as last_message_date,
(
SELECT m2.ROWID
FROM message m2
JOIN chat_message_join cmj2 ON m2.ROWID = cmj2.message_id
WHERE cmj2.chat_id = c.ROWID
ORDER BY m2.date DESC
LIMIT 1
) as last_message_rowid
FROM chat c
LEFT JOIN chat_message_join cmj ON c.ROWID = cmj.chat_id
LEFT JOIN message m ON cmj.message_id = m.ROWID
GROUP BY c.ROWID
ORDER BY last_message_date DESC
LIMIT ? OFFSET ?
""",
(limit + 1, offset), # Fetch one extra to detect hasMore
)
conversations = []
rows = cursor.fetchall()
for row in rows[:limit]:
service, is_group, identifier = parse_chat_guid(row["chat_guid"])
# Get last message preview
last_message_preview = None
if row["last_message_rowid"]:
msg_cursor = conn.execute(
"SELECT text, attributedBody FROM message WHERE ROWID = ?",
(row["last_message_rowid"],),
)
msg_row = msg_cursor.fetchone()
if msg_row:
text = parse_message_text(msg_row)
if text:
last_message_preview = (
text[:100] + "..." if len(text) > 100 else text
)
# Get participants for this chat
participants_cursor = conn.execute(
"""
SELECT h.id
FROM chat_handle_join chj
JOIN handle h ON chj.handle_id = h.ROWID
WHERE chj.chat_id = ?
""",
(row["chat_rowid"],),
)
participants = list(set(r["id"] for r in participants_cursor))
last_date = coredata_to_datetime(row["last_message_date"])
conversations.append(
{
"chat_id": row["chat_rowid"],
"chat_identifier": row["chat_identifier"],
"display_name": row["display_name"],
"service": service,
"is_group": is_group,
"participants": participants,
"message_count": row["message_count"],
"last_message_date": format_datetime_iso(last_date),
"last_message_date_raw": row["last_message_date"],
"last_message_preview": last_message_preview,
}
)
# Get total count
count_cursor = conn.execute("SELECT COUNT(*) FROM chat")
total = count_cursor.fetchone()[0]
has_more = len(rows) > limit
# Enrich conversations with contact names
cache = get_contact_cache()
for conv in conversations:
if not conv.get("is_group"):
# For 1-on-1 chats, resolve the chat identifier
chat_identifier = conv.get("chat_identifier")
if chat_identifier:
conv["contact_name"] = await cache.resolve_name(chat_identifier)
else:
conv["contact_name"] = None
else:
# For group chats, resolve all participant handles
participants = conv.get("participants", [])
if participants:
names = await cache.resolve_names_batch(participants)
conv["participant_names"] = [names.get(p) for p in participants]
else:
conv["participant_names"] = []
return {
"conversations": conversations,
"pagination": {
"total": total,
"offset": offset,
"limit": limit,
"has_more": has_more,
"next_offset": offset + limit if has_more else None,
},
}
async def get_conversation_messages(
chat_id: Optional[int] = None,
contact: Optional[str] = None,
limit: int = DEFAULT_MESSAGE_LIMIT,
offset: int = 0,
) -> dict:
"""Get messages from a specific conversation.
You can specify the conversation by either chat_id or contact identifier.
If contact is provided, it will be normalized and used to find the chat.
Messages are enriched with contact names for senders.
Args:
chat_id: Database row ID of the chat (from list_conversations)
contact: Phone number or email to find the conversation
limit: Maximum number of messages to return (default: 50)
offset: Number of messages to skip (default: 0)
Returns:
Dictionary with messages list, conversation info, and pagination.
"""
if chat_id is None and contact is None:
return {
"error": "Either chat_id or contact must be provided",
"messages": [],
}
with get_connection() as conn:
# If contact provided, find the chat
if contact and chat_id is None:
normalized = normalize_handle(contact)
chat_cursor = conn.execute(
"""
SELECT c.ROWID
FROM chat c
WHERE c.chat_identifier LIKE ?
LIMIT 1
""",
(f"%{normalized}%",),
)
result = chat_cursor.fetchone()
if result:
chat_id = result[0]
else:
return {
"error": f"No conversation found for contact: {contact}",
"messages": [],
}
# Get chat info
chat_cursor = conn.execute(
"SELECT guid, chat_identifier, display_name FROM chat WHERE ROWID = ?",
(chat_id,),
)
chat_info = chat_cursor.fetchone()
if not chat_info:
return {
"error": f"Chat not found: {chat_id}",
"messages": [],
}
service, is_group, identifier = parse_chat_guid(chat_info["guid"])
# Get messages
cursor = conn.execute(
"""
SELECT
m.ROWID,
m.guid,
m.text,
m.attributedBody,
m.date,
m.is_from_me,
m.service,
h.id as sender_handle
FROM message m
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
LEFT JOIN handle h ON m.handle_id = h.ROWID
WHERE cmj.chat_id = ?
ORDER BY m.date DESC
LIMIT ? OFFSET ?
""",
(chat_id, limit + 1, offset),
)
messages = []
rows = cursor.fetchall()
for row in rows[:limit]:
text = parse_message_text(row)
msg_date = coredata_to_datetime(row["date"])
# Get attachments for this message
att_cursor = conn.execute(
"""
SELECT
a.ROWID,
a.guid,
a.filename,
a.mime_type,
a.uti,
a.total_bytes
FROM attachment a
JOIN message_attachment_join maj ON a.ROWID = maj.attachment_id
WHERE maj.message_id = ?
""",
(row["ROWID"],),
)
attachments = []
for att in att_cursor:
attachments.append(
{
"guid": att["guid"],
"filename": att["filename"],
"mime_type": att["mime_type"],
"uti": att["uti"],
"total_bytes": att["total_bytes"],
"file_uri": (
ensure_file_uri(att["filename"])
if att["filename"]
else None
),
}
)
messages.append(
{
"rowid": row["ROWID"],
"guid": row["guid"],
"text": text,
"date": format_datetime_iso(msg_date),
"date_raw": row["date"],
"is_from_me": bool(row["is_from_me"]),
"service": row["service"],
"sender": row["sender_handle"] if not row["is_from_me"] else "me",
"attachments": attachments,
}
)
# Get total count
count_cursor = conn.execute(
"""
SELECT COUNT(*)
FROM chat_message_join
WHERE chat_id = ?
""",
(chat_id,),
)
total = count_cursor.fetchone()[0]
has_more = len(rows) > limit
# Enrich messages with contact names
cache = get_contact_cache()
# Collect all unique sender handles (excluding "me")
sender_handles = list(set(msg["sender"] for msg in messages if msg["sender"] != "me"))
# Batch resolve all sender names
if sender_handles:
sender_names = await cache.resolve_names_batch(sender_handles)
# Add contact_name field to each message
for msg in messages:
if msg["sender"] == "me":
msg["contact_name"] = None
else:
msg["contact_name"] = sender_names.get(msg["sender"])
else:
# All messages are from "me", just set contact_name to None
for msg in messages:
msg["contact_name"] = None
return {
"chat": {
"chat_id": chat_id,
"chat_identifier": chat_info["chat_identifier"],
"display_name": chat_info["display_name"],
"service": service,
"is_group": is_group,
},
"messages": messages,
"pagination": {
"total": total,
"offset": offset,
"limit": limit,
"has_more": has_more,
"next_offset": offset + limit if has_more else None,
},
}