"""get_messages tool implementation."""
import os
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Any
from ..db import get_db_connection, apple_to_datetime, datetime_to_apple, DB_PATH
from ..contacts import ContactResolver
from ..phone import normalize_to_e164, format_phone_display
from ..queries import get_chat_participants, get_messages_for_chat, get_reactions_for_messages, get_attachments_for_messages
from ..parsing import get_message_text, get_reaction_type, reaction_to_emoji, extract_links
from ..time_utils import parse_time_input
from ..models import Participant, generate_display_name
from ..suggestions import get_message_suggestions
from ..enrichment import get_image_metadata, process_video, process_audio, enrich_links
# Default unanswered window in hours
DEFAULT_UNANSWERED_HOURS = 24
def _format_file_size(bytes_size: Optional[int]) -> str:
"""Format file size in human-readable format."""
if bytes_size is None or bytes_size <= 0:
return "0 B"
size = float(bytes_size)
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
if unit == 'B':
return f"{int(size)} {unit}"
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
# Session detection: 4 hours gap starts a new session
SESSION_GAP_HOURS = 4
SESSION_GAP_NS = SESSION_GAP_HOURS * 60 * 60 * 1_000_000_000
# Maximum number of media items to return metadata for per request
MAX_MEDIA = 10
# Maximum number of links to enrich per request
MAX_LINKS = 10
def _get_attachment_type(mime_type: Optional[str], uti: Optional[str]) -> str:
"""Determine attachment type from MIME type or UTI.
Args:
mime_type: MIME type string (e.g., "image/jpeg")
uti: Uniform Type Identifier (e.g., "public.jpeg")
Returns:
One of: "image", "video", "audio", "pdf", "other"
"""
if not mime_type and not uti:
return "other"
mime = (mime_type or "").lower()
uti_str = (uti or "").lower()
if "image" in mime or "image" in uti_str or "jpeg" in uti_str or "png" in uti_str or "heic" in uti_str:
return "image"
elif "video" in mime or "movie" in uti_str or "video" in uti_str:
return "video"
elif "audio" in mime or "audio" in uti_str:
return "audio"
elif "pdf" in mime or "pdf" in uti_str:
return "pdf"
else:
return "other"
def _looks_like_question(text: str) -> bool:
"""Check if a message appears to expect a response.
Args:
text: The message text to check
Returns:
True if the message looks like it expects a response
"""
if not text:
return False
text_lower = text.lower().strip()
# Contains question mark
if '?' in text:
return True
# Ends with common question/request patterns
question_endings = [
"what do you think",
"let me know",
"thoughts",
"can you",
"could you",
"would you",
"will you",
"please",
"lmk",
]
for ending in question_endings:
if text_lower.endswith(ending):
return True
return False
def _has_reply_within_window(conn, chat_id: int, message_date: int, hours: int = DEFAULT_UNANSWERED_HOURS) -> bool:
"""Check if there's a reply from someone else within the specified window.
Args:
conn: Database connection
chat_id: Chat ROWID
message_date: Apple timestamp of the original message
hours: Number of hours to check for a reply (default 24)
Returns:
True if there is a reply within the window
"""
window_ns = hours * 60 * 60 * 1_000_000_000
cursor = conn.execute("""
SELECT 1 FROM message m
JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
WHERE cmj.chat_id = ?
AND m.date > ?
AND m.date <= ?
AND m.is_from_me = 0
AND m.associated_message_type = 0
LIMIT 1
""", (chat_id, message_date, message_date + window_ns))
return cursor.fetchone() is not None
def _assign_sessions(
messages: list[dict[str, Any]], message_rows: list[dict]
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Assign session IDs to messages and build sessions summary.
A new session starts when there's a gap of 4+ hours between messages.
Args:
messages: List of formatted message dicts (in DESC order - most recent first)
message_rows: List of raw message rows with 'date' field (same order as messages)
Returns:
Tuple of (messages with session info, sessions summary list)
"""
if not messages:
return messages, []
sessions = []
current_session = 1
session_message_count = 0
session_start_ts = None
# Messages are in DESC order (most recent first)
# Reverse to process oldest first for session assignment
reversed_indices = list(range(len(messages) - 1, -1, -1))
for i, idx in enumerate(reversed_indices):
msg = messages[idx]
row = message_rows[idx]
msg_date = row['date'] if row['date'] else 0
# Check if new session (gap from previous)
if i > 0:
prev_idx = reversed_indices[i - 1]
prev_date = message_rows[prev_idx]['date'] if message_rows[prev_idx]['date'] else 0
gap = msg_date - prev_date
if gap >= SESSION_GAP_NS:
# Save previous session
sessions.append({
"session_id": f"session_{current_session}",
"started": session_start_ts,
"message_count": session_message_count
})
current_session += 1
session_message_count = 0
# Mark this message as session start
msg["session_start"] = True
msg["session_gap_hours"] = round(gap / (60 * 60 * 1_000_000_000), 1)
else:
msg["session_start"] = False
else:
# First message (oldest)
msg["session_start"] = True
msg["session_id"] = f"session_{current_session}"
session_message_count += 1
if msg["session_start"]:
session_start_ts = apple_to_datetime(msg_date).isoformat() if msg_date else None
# Save final session
sessions.append({
"session_id": f"session_{current_session}",
"started": session_start_ts,
"message_count": session_message_count
})
# Reverse sessions so most recent is first
sessions.reverse()
return messages, sessions
def get_messages_impl(
chat_id: Optional[str] = None,
participants: Optional[list[str]] = None,
since: Optional[str] = None,
before: Optional[str] = None,
limit: int = 50,
from_person: Optional[str] = None,
contains: Optional[str] = None,
has: Optional[str] = None,
include_reactions: bool = True,
cursor: Optional[str] = None,
unanswered: bool = False,
unanswered_hours: int = DEFAULT_UNANSWERED_HOURS,
session: Optional[str] = None,
db_path: str = DB_PATH,
) -> dict[str, Any]:
"""
Get messages from a chat with flexible filtering.
Either chat_id or participants must be provided.
Args:
chat_id: Chat identifier (e.g., "chat1" or "chat123")
participants: Alternative - find chat by participant handles
since: Time bound (ISO, relative like "24h", or natural like "yesterday")
before: Upper time bound
limit: Maximum messages to return (default 50)
from_person: Filter to messages from specific person (or "me")
contains: Text search within messages
has: Filter by content type ("links", "attachments", etc.)
include_reactions: Include reaction data (default True)
cursor: Pagination cursor for continuing retrieval
unanswered: Only return messages from me that didn't receive a reply
unanswered_hours: Window in hours to check for replies (default 24)
session: Filter to specific session ID (e.g., "session_1", "session_2")
db_path: Path to chat.db (for testing)
Returns:
Dict with chat info, people map, messages, and sessions summary
"""
if not chat_id and not participants:
return {
"error": "validation_error",
"message": "Either chat_id or participants must be provided",
}
resolver = ContactResolver()
if resolver.is_available:
resolver.initialize() # Explicitly initialize to trigger auth check
try:
with get_db_connection(db_path) as conn:
# Resolve participants to chat_id if provided
if participants and not chat_id:
from .find_chat import _find_chats_by_handle_groups
# Build handle groups for each participant
handle_groups = []
for p in participants:
group_handles = []
if p.startswith('+'):
group_handles.append(p)
else:
normalized = normalize_to_e164(p)
if normalized:
group_handles.append(normalized)
if resolver.is_available:
matches = resolver.search_by_name(p)
for handle, _ in matches:
if handle not in group_handles:
group_handles.append(handle)
if group_handles:
handle_groups.append(group_handles)
if handle_groups:
matching_chats = _find_chats_by_handle_groups(conn, handle_groups)
target_count = len(participants) + 1 # +1 for me
# Filter to exact participant count matches
exact_matches = [c for c in matching_chats
if c.get('participant_count') == target_count]
if len(exact_matches) == 1:
# Single exact match - use it
chat_id = f"chat{exact_matches[0]['id']}"
elif len(exact_matches) > 1:
# Multiple exact matches - use most recent (already sorted by recency)
chat_id = f"chat{exact_matches[0]['id']}"
elif len(matching_chats) == 1:
# Single partial match - use it
chat_id = f"chat{matching_chats[0]['id']}"
elif matching_chats:
# Ambiguous - return disambiguation options
return {
"error": "ambiguous_participants",
"message": f"Multiple chats found with participants: {participants}",
"candidates": [
{
"chat_id": f"chat{c['id']}",
"name": c.get('display_name') or "(Unnamed)",
"participant_count": c.get('participant_count', 0),
}
for c in matching_chats[:5]
],
"suggestion": f"Did you mean the {matching_chats[0].get('participant_count', 0)}-person chat or a different one?"
}
else:
return {
"error": "chat_not_found",
"message": f"No chat found with participants: {participants}"
}
else:
return {
"error": "invalid_participants",
"message": f"Could not resolve any handles for participants: {participants}"
}
# Resolve chat_id to numeric ID
numeric_chat_id = None
if chat_id:
# Extract numeric ID from "chatXXX" format
if chat_id.startswith("chat"):
try:
numeric_chat_id = int(chat_id[4:])
except ValueError:
pass
if numeric_chat_id is None:
# Try to find by GUID
cursor_obj = conn.execute(
"SELECT ROWID FROM chat WHERE guid LIKE ?",
(f"%{chat_id}%",)
)
row = cursor_obj.fetchone()
if row:
numeric_chat_id = row[0]
if numeric_chat_id is None:
return {
"error": "chat_not_found",
"message": f"Chat not found: {chat_id}",
}
# Get chat info
chat_cursor = conn.execute("""
SELECT c.ROWID, c.guid, c.display_name, c.service_name
FROM chat c WHERE c.ROWID = ?
""", (numeric_chat_id,))
chat_row = chat_cursor.fetchone()
if not chat_row:
return {
"error": "chat_not_found",
"message": f"Chat not found: {chat_id}",
}
# Get participants
participant_rows = get_chat_participants(conn, numeric_chat_id, resolver)
# Build people map (handle -> short key)
people = {"me": "Me"}
handle_to_key = {}
unknown_count = 0
for i, p in enumerate(participant_rows):
handle = p['handle']
if p['name']:
# Use first name as key
key = p['name'].split()[0].lower()
# Handle duplicates
if key in people:
key = f"{key}{i}"
people[key] = p['name']
handle_to_key[handle] = key
else:
unknown_count += 1
key = f"unknown{unknown_count}"
people[key] = format_phone_display(handle)
handle_to_key[handle] = key
# Convert time filters to Apple epoch
since_apple = None
before_apple = None
if since:
since_dt = parse_time_input(since)
if since_dt:
since_apple = datetime_to_apple(since_dt)
if before:
before_dt = parse_time_input(before)
if before_dt:
before_apple = datetime_to_apple(before_dt)
# Resolve from_person to handle
from_handle = None
from_me_only = False
# unanswered implies from_me_only (only my messages can be unanswered)
if unanswered:
from_me_only = True
elif from_person:
if from_person.lower() == "me":
from_me_only = True
else:
from_handle = normalize_to_e164(from_person)
if not from_handle and resolver.is_available:
resolver.initialize()
# Use public search_by_name method instead of private _lookup
matches = resolver.search_by_name(from_person)
if matches:
from_handle = matches[0][0]
# For unanswered filtering, we need to fetch more messages initially
# since we'll filter some out, then apply limit after filtering
fetch_limit = limit * 3 if unanswered else limit
# Get messages
message_rows = get_messages_for_chat(
conn,
numeric_chat_id,
limit=fetch_limit,
since_apple=since_apple,
before_apple=before_apple,
from_handle=from_handle,
from_me_only=from_me_only,
contains=contains,
)
# Filter for unanswered messages if requested
if unanswered:
filtered_rows = []
for row in message_rows:
text = get_message_text(row['text'], row.get('attributedBody'))
# Check if message looks like a question and has no reply within window
if _looks_like_question(text) and not _has_reply_within_window(
conn, numeric_chat_id, row['date'], unanswered_hours
):
filtered_rows.append(row)
# Stop once we have enough
if len(filtered_rows) >= limit:
break
message_rows = filtered_rows
# Get reactions for messages
reactions_map = {}
if include_reactions and message_rows:
message_guids = [m['guid'] for m in message_rows]
reactions_map = get_reactions_for_messages(conn, message_guids)
# Build response
messages = []
for row in message_rows:
text = get_message_text(row['text'], row.get('attributedBody'))
msg: dict[str, Any] = {
"id": f"msg_{row['id']}",
"ts": apple_to_datetime(row['date']).isoformat() if row['date'] else None,
"text": text,
}
# Add sender
if row['is_from_me']:
msg["from"] = "me"
elif row['sender_handle']:
msg["from"] = handle_to_key.get(row['sender_handle'], row['sender_handle'])
# Add reactions if enabled
if include_reactions and row['guid'] in reactions_map:
reactions = []
for r in reactions_map[row['guid']]:
reaction_type = get_reaction_type(r['type'])
if reaction_type and not reaction_type.startswith('removed'):
emoji = reaction_to_emoji(reaction_type)
if r['from_handle']:
from_key = handle_to_key.get(r['from_handle'], 'unknown')
else:
from_key = 'me'
reactions.append(f"{emoji} {from_key}")
if reactions:
msg["reactions"] = reactions
# Extract links
if text:
links = extract_links(text)
if links:
msg["links"] = links
messages.append(msg)
# Assign session IDs to messages
messages, sessions_summary = _assign_sessions(messages, message_rows)
# Filter by session if requested
if session:
messages = [m for m in messages if m.get("session_id") == session]
# Also filter sessions summary to only include the requested session
sessions_summary = [s for s in sessions_summary if s["session_id"] == session]
# Enrich messages with media and link data
media_truncated = False
total_media = 0
if messages:
# Fetch attachments for messages
message_row_ids = [row['id'] for row in message_rows]
attachments_map = get_attachments_for_messages(conn, message_row_ids)
# Collect all items to process
image_tasks = [] # (msg_idx, attachment, filepath)
video_tasks = []
audio_tasks = []
url_to_msg_indices: dict[str, list[int]] = {} # URL -> list of msg indices
all_urls: list[str] = []
for idx, (msg, row) in enumerate(zip(messages, message_rows)):
msg_id = row['id']
# Collect attachment processing tasks
if msg_id in attachments_map:
for att in attachments_map[msg_id]:
att_type = _get_attachment_type(att['mime_type'], att['uti'])
filename = att['filename']
if filename:
# Expand ~ in path
if filename.startswith('~'):
filename = os.path.expanduser(filename)
if att_type == 'image':
image_tasks.append((idx, att, filename))
elif att_type == 'video':
video_tasks.append((idx, att, filename))
elif att_type == 'audio':
audio_tasks.append((idx, att, filename))
else:
# Other types go directly to attachments
if 'attachments' not in msg:
msg['attachments'] = []
msg['attachments'].append({
'type': att_type,
'filename': att['filename'].split('/')[-1] if att['filename'] else None,
'size': att['total_bytes'],
})
# Collect links for enrichment (links already extracted during message building)
if 'links' in msg and msg['links']:
for url in msg['links']:
if url not in url_to_msg_indices:
url_to_msg_indices[url] = []
all_urls.append(url)
url_to_msg_indices[url].append(idx)
# Clear raw links, will be replaced with enriched
del msg['links']
# Process media in parallel
processed_count = 0
with ThreadPoolExecutor(max_workers=4) as executor:
# Get image metadata (no base64, just dimensions and size)
image_futures = {
executor.submit(get_image_metadata, path): (idx, att)
for idx, att, path in image_tasks[:MAX_MEDIA]
}
for future in image_futures:
idx, att = image_futures[future]
try:
result = future.result()
if result:
if 'media' not in messages[idx]:
messages[idx]['media'] = []
# Build metadata-only response with attachment ID for retrieval
messages[idx]['media'].append({
'type': 'image',
'id': f"att{att['id']}",
'filename': result['filename'],
'size_bytes': result['size_bytes'],
'size_human': _format_file_size(result['size_bytes']),
'dimensions': result['dimensions'],
})
processed_count += 1
else:
# Failed - add to attachments
if 'attachments' not in messages[idx]:
messages[idx]['attachments'] = []
messages[idx]['attachments'].append({
'type': 'image',
'filename': att['filename'].split('/')[-1] if att['filename'] else None,
'size': att['total_bytes'],
})
except Exception:
pass
# Process videos
remaining_slots = MAX_MEDIA - processed_count
video_futures = {
executor.submit(process_video, path): (idx, att)
for idx, att, path in video_tasks[:remaining_slots]
}
for future in video_futures:
idx, att = video_futures[future]
try:
result = future.result()
if result:
if 'media' not in messages[idx]:
messages[idx]['media'] = []
# Add attachment ID for full resolution retrieval
result['id'] = f"att{att['id']}"
messages[idx]['media'].append(result)
processed_count += 1
else:
if 'attachments' not in messages[idx]:
messages[idx]['attachments'] = []
messages[idx]['attachments'].append({
'type': 'video',
'filename': att['filename'].split('/')[-1] if att['filename'] else None,
'size': att['total_bytes'],
})
except Exception:
pass
# Process audio (just duration, always goes to attachments)
audio_futures = {
executor.submit(process_audio, path): (idx, att)
for idx, att, path in audio_tasks
}
for future in audio_futures:
idx, att = audio_futures[future]
try:
result = future.result()
if 'attachments' not in messages[idx]:
messages[idx]['attachments'] = []
if result:
messages[idx]['attachments'].append(result)
else:
messages[idx]['attachments'].append({
'type': 'audio',
'filename': att['filename'].split('/')[-1] if att['filename'] else None,
'size': att['total_bytes'],
})
except Exception:
pass
# Enrich links (capped at MAX_LINKS)
if all_urls:
urls_to_enrich = all_urls[:MAX_LINKS]
enriched = enrich_links(urls_to_enrich)
for url, link_data in zip(urls_to_enrich, enriched):
for msg_idx in url_to_msg_indices[url]:
if 'links' not in messages[msg_idx]:
messages[msg_idx]['links'] = []
messages[msg_idx]['links'].append(link_data)
# Calculate truncation info
total_media = len(image_tasks) + len(video_tasks)
media_truncated = total_media > MAX_MEDIA
# Build chat info
participant_objs = [
Participant(handle=p['handle'], name=p['name'])
for p in participant_rows
]
display_name = chat_row['display_name'] or generate_display_name(participant_objs)
response = {
"chat": {
"id": f"chat{numeric_chat_id}",
"name": display_name,
},
"people": people,
"messages": messages,
"sessions": sessions_summary,
"more": len(messages) == limit,
"cursor": None,
}
# Add media truncation info if applicable
if media_truncated:
response["media_truncated"] = True
response["media_total"] = total_media
response["media_included"] = MAX_MEDIA
# Add suggestions when no messages found
if not messages:
suggestions = get_message_suggestions(
conn,
resolver,
query=contains,
chat_id=numeric_chat_id,
since=since,
from_person=from_person,
)
if suggestions:
response["suggestions"] = suggestions
return response
except FileNotFoundError:
return {
"error": "database_not_found",
"message": f"Database not found at {db_path}",
}
except Exception as e:
return {
"error": "internal_error",
"message": str(e),
}