list_conversations
Retrieve all direct and group conversations sorted by most recent message, providing an up-to-date overview of your Signal chats.
Instructions
List all conversations (direct and group) ordered by most recent message
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/signal_mcp/client.py:855-862 (handler)SignalClient.list_conversations() — async handler that delegates to store.list_conversations() then enriches each conversation with resolved contact/group names from the caches.
async def list_conversations(self) -> list[dict]: convs = await asyncio.to_thread(_store.list_conversations, own_number=self.account) for conv in convs: if conv["type"] == "direct": conv["name"] = self.resolve_name(conv["id"]) elif conv["type"] == "group": conv["name"] = self.resolve_group_name(conv["id"]) return convs - src/signal_mcp/server.py:1415-1420 (handler)MCP server call_tool handler for 'list_conversations' — ensures contact/group caches are loaded, then calls client.list_conversations() and returns the result.
elif name == "list_conversations": await client._ensure_contact_cache() await client._ensure_group_cache() # client.list_conversations() already resolves names via resolve_name/resolve_group_name conversations = await client.list_conversations() return _ok(conversations) - src/signal_mcp/store.py:298-357 (helper)store.list_conversations() — core SQLite query that groups messages by conversation ID, computes type (direct/group), last_message_at, message_count, unread_count, and the last message body via a window function.
def list_conversations(own_number: str = "") -> list[dict]: """Return all distinct conversations ordered by most recent message.""" init_db() with _db() as conn: rows = conn.execute( """SELECT COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) AS id, CASE WHEN group_id IS NOT NULL THEN 'group' ELSE 'direct' END AS type, MAX(timestamp) AS last_message_at, COUNT(*) AS message_count, SUM(CASE WHEN is_read = 0 AND sender != ? THEN 1 ELSE 0 END) AS unread_count FROM messages WHERE COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) IS NOT NULL GROUP BY COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) ORDER BY last_message_at DESC""", (own_number, own_number, own_number, own_number), ).fetchall() # Fetch last message body per conversation using a window function — # single table scan, no IN list, no variable-count risk. last_body: dict[str, str] = {} if rows: snippet_rows = conn.execute( """SELECT conv_id, body FROM ( SELECT COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) AS conv_id, body, ROW_NUMBER() OVER ( PARTITION BY COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) ORDER BY timestamp DESC ) AS rn FROM messages WHERE COALESCE(group_id, CASE WHEN sender = ? THEN recipient ELSE sender END ) IS NOT NULL ) WHERE rn = 1""", (own_number, own_number, own_number), ).fetchall() for s in snippet_rows: last_body[s["conv_id"]] = s["body"] return [ { "id": r["id"], "type": r["type"], "last_message_at": datetime.fromtimestamp(r["last_message_at"] / 1000).isoformat(), "message_count": r["message_count"], "unread_count": r["unread_count"] or 0, "last_message": last_body.get(r["id"], ""), } for r in rows ] - src/signal_mcp/server.py:481-485 (registration)MCP Tool schema definition for 'list_conversations' — registers the tool with name, description, and empty input schema (no required params).
Tool( name="list_conversations", description="List all conversations (direct and group) ordered by most recent message", inputSchema={"type": "object", "properties": {}}, ), - src/signal_mcp/cli.py:348-374 (handler)CLI 'conversations' command handler — calls client.list_conversations() and formats output as text or JSON.
@cli.command() @click.option("--json", "as_json", is_flag=True, help="Output as JSON") def conversations(as_json: bool): """List all conversations ordered by most recent message.""" async def _run(): async with SignalClient() as client: await client._ensure_contact_cache() await client._ensure_group_cache() convs = await client.list_conversations() if not convs: click.echo("No conversations found.") return if as_json: click.echo(json.dumps(convs, indent=2)) else: for c in convs: unread = f" ({c['unread_count']} unread)" if c.get("unread_count") else "" name = c.get("name") or c["id"] snippet = c.get("last_message", "")[:60] click.echo(f"{name:<35} {c['type']:<7}{unread}") if snippet: click.echo(f" {snippet}") try: run(_run()) except SignalError as e: click.echo(f"Error: {e}", err=True) sys.exit(1)