read_messages
Retrieve messages from a Signal chat, sorted newest first. Each message shows sender, date, body, reactions, and attachments. Supports pagination and time range filters.
Instructions
Read messages from a single Signal chat, returned newest-first.
Each message includes sender, date, body text, reactions, and attachment metadata. Read-only with no side effects. Requires an exact chat name from list_chats. Use search_messages instead to find messages by keyword across chats.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| chat_name | Yes | Exact chat name as returned by list_chats (case-sensitive). | |
| limit | No | Maximum number of messages to return, between 1 and 200. | |
| offset | No | Number of messages to skip from the most recent, for pagination (0-10000). | |
| after | No | ISO 8601 datetime; only return messages sent after this time, e.g. '2025-01-15T00:00:00'. | |
| before | No | ISO 8601 datetime; only return messages sent before this time, e.g. '2025-02-01T00:00:00'. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/mcp_signal/server.py:93-159 (registration)MCP tool registration via @mcp.tool() decorator. Defines the 'read_messages' FastMCP tool with input schema (chat_name, limit, offset, after, before) and delegates to reader.read_messages().
@mcp.tool() def read_messages( chat_name: Annotated[ str, Field( description=( "Exact chat name as returned by list_chats" " (case-sensitive)." ), ), ], limit: Annotated[ int, Field( description=( "Maximum number of messages to return," " between 1 and 200." ), ), ] = 20, offset: Annotated[ int, Field( description=( "Number of messages to skip from the most" " recent, for pagination (0-10000)." ), ), ] = 0, after: Annotated[ str | None, Field( description=( "ISO 8601 datetime; only return messages sent" " after this time, e.g." " '2025-01-15T00:00:00'." ), ), ] = None, before: Annotated[ str | None, Field( description=( "ISO 8601 datetime; only return messages sent" " before this time, e.g." " '2025-02-01T00:00:00'." ), ), ] = None, ) -> list[dict[str, Any]]: """Read messages from a single Signal chat, returned newest-first. Each message includes sender, date, body text, reactions, and attachment metadata. Read-only with no side effects. Requires an exact chat name from list_chats. Use search_messages instead to find messages by keyword across chats. """ limit = min(max(limit, 1), _MAX_LIMIT) offset = min(max(offset, 0), _MAX_OFFSET) return reader.read_messages( chat_name, limit=limit, offset=offset, after=after, before=before, ) - src/mcp_signal/server.py:94-141 (schema)Input schema/type definitions for the read_messages tool using Pydantic Field annotations: chat_name (str, exact name from list_chats), limit (1-200, default 20), offset (0-10000, default 0), after (optional ISO 8601 datetime), before (optional ISO 8601 datetime).
def read_messages( chat_name: Annotated[ str, Field( description=( "Exact chat name as returned by list_chats" " (case-sensitive)." ), ), ], limit: Annotated[ int, Field( description=( "Maximum number of messages to return," " between 1 and 200." ), ), ] = 20, offset: Annotated[ int, Field( description=( "Number of messages to skip from the most" " recent, for pagination (0-10000)." ), ), ] = 0, after: Annotated[ str | None, Field( description=( "ISO 8601 datetime; only return messages sent" " after this time, e.g." " '2025-01-15T00:00:00'." ), ), ] = None, before: Annotated[ str | None, Field( description=( "ISO 8601 datetime; only return messages sent" " before this time, e.g." " '2025-02-01T00:00:00'." ), ), ] = None, - src/mcp_signal/reader.py:217-250 (handler)Core handler implementation of read_messages() in DesktopReader class. Fetches data, filters by exact chat_name, sorts messages newest-first, applies offset/limit and optional after/before date filters, formats messages via _format_message(), and returns results.
def read_messages( self, chat_name: str, *, limit: int = 20, offset: int = 0, after: str | None = None, before: str | None = None, ) -> list[dict[str, Any]]: try: start_date = datetime.fromisoformat(after) if after else None end_date = datetime.fromisoformat(before) if before else None except ValueError as exc: raise ValueError(f"Invalid date format (expected ISO 8601): {exc}") from exc convos, contacts, self_contact = self._fetch_data(start_date=start_date, end_date=end_date) self_id = self_contact.serviceId if self_contact else None sid_lookup = _build_sid_lookup(contacts) for chat_id, messages in convos.items(): contact = contacts.get(chat_id) if not contact: continue name = contact.name or contact.number or "Unknown" if name != chat_name: continue sorted_messages = sorted(messages, key=lambda message: message.get_ts(), reverse=True) end_idx = offset + limit if limit else len(sorted_messages) return [ _format_message( name, asdict(message), self_id, contact, sid_lookup, source_dir=self._config.source_dir, ) for message in sorted_messages[offset:end_idx] ] return [] - src/mcp_signal/reader.py:59-95 (helper)Helper function _format_message() used by read_messages to format individual messages. Extracts body, quote, sticker, reactions, and attachments, wraps untrusted fields (body, quote, sticker) with XML-like delimiters for prompt injection defense.
def _format_message( chat_name: str, raw: dict[str, Any], self_id: str | None, contact: Any, sid_lookup: dict[str, Any] | None = None, source_dir: Path | None = None, ) -> dict[str, Any]: dt = _parse_ts(raw) body = raw.get("body", "") or "" quote = raw.get("quote", "") or "" sticker = raw.get("sticker", "") or "" attachments: list[dict[str, str]] = [ { "file_name": _wrap_untrusted(a.get("fileName") or ""), "content_type": a.get("contentType") or "", "size": str(a.get("size") or ""), "encrypted_path": str( source_dir / "attachments.noindex" / str(a.get("path", "")).replace("\\", "/") ) if a.get("path") and source_dir else "", "local_key": a.get("localKey") or "", "version": str(a.get("version") or ""), } for a in (raw.get("attachments") or []) ] if raw.get("attachments") else [] return { "chat_name": chat_name, "date": dt.isoformat() if dt else "", "sender": _sender_name(raw, self_id, contact, sid_lookup), "body": _wrap_untrusted(body), "quote": _wrap_untrusted(quote) if isinstance(quote, str) else quote, "sticker": _wrap_untrusted(sticker) if isinstance(sticker, str) else sticker, "reactions": raw.get("reactions", []) or [], "attachments": attachments, "_content_type": "untrusted_user_content", "_untrusted_fields": ["body", "quote", "sticker"], } - src/mcp_signal/reader.py:42-56 (helper)Helper function _sender_name() determines the display name for a message sender. Returns 'Me' for outgoing messages, resolves group member names via sid_lookup, falls back to contact name/number.
def _sender_name( raw: dict[str, Any], self_id: str | None, contact: Any, sid_lookup: dict[str, Any] | None = None, ) -> str: if _is_outgoing(raw, self_id): return "Me" source = raw.get("source") if contact.is_group and source and sid_lookup: sender = sid_lookup.get(source) if sender: return sender.name or sender.profile_name or sender.number or source return source return contact.name or contact.number or "Unknown"