/**
* Messages namespace - Internal mailbox system
*/
import { v4 as uuidv4 } from 'uuid';
import { MCPServer } from '../core/server.js';
import { InvalidArgError, NotFoundError } from '../core/errors.js';
import {
MessageTarget,
MessageSender,
Thread,
Message,
CreateThreadResponse,
PostMessageResponse,
ThreadFilter,
ListThreadsResponse,
GetThreadResponse,
UpdateMessageResponse,
InboxResponse
} from '../types/messages.js';
interface StoredThread extends Thread {
messages: Message[];
}
export class MessagesNamespace {
private mcpServer: MCPServer;
private threads = new Map<string, StoredThread>();
private messages = new Map<string, Message>();
constructor(mcpServer: MCPServer) {
this.mcpServer = mcpServer;
this.registerTools();
}
private registerTools(): void {
const registry = this.mcpServer.getRegistry();
registry.registerTool(
'messages.create_thread',
{
name: 'messages.create_thread',
description: 'Create a new message thread',
inputSchema: {
type: 'object',
properties: {
subject: { type: 'string' },
target: { type: 'string', enum: ['self', 'user'] },
tags: { type: 'array', items: { type: 'string' } }
},
required: ['subject', 'target']
}
},
this.createThread.bind(this)
);
registry.registerTool(
'messages.post_message',
{
name: 'messages.post_message',
description: 'Post a message to a thread',
inputSchema: {
type: 'object',
properties: {
thread_id: { type: 'string' },
body: { type: 'string' },
meta: {
type: 'object',
properties: {
in_reply_to: { type: 'string' }
}
}
},
required: ['thread_id', 'body']
}
},
this.postMessage.bind(this)
);
registry.registerTool(
'messages.list_threads',
{
name: 'messages.list_threads',
description: 'List message threads with filtering',
inputSchema: {
type: 'object',
properties: {
filter: {
type: 'object',
properties: {
target: { type: 'string', enum: ['self', 'user'] },
tags: { type: 'array', items: { type: 'string' } },
q: { type: 'string' },
limit: { type: 'number' },
cursor: { type: 'string' }
}
}
}
}
},
this.listThreads.bind(this)
);
registry.registerTool(
'messages.get_thread',
{
name: 'messages.get_thread',
description: 'Get a thread with optional messages',
inputSchema: {
type: 'object',
properties: {
thread_id: { type: 'string' },
include_messages: { type: 'boolean' }
},
required: ['thread_id']
}
},
this.getThread.bind(this)
);
registry.registerTool(
'messages.update_message',
{
name: 'messages.update_message',
description: 'Update a message body',
inputSchema: {
type: 'object',
properties: {
message_id: { type: 'string' },
body: { type: 'string' }
},
required: ['message_id', 'body']
}
},
this.updateMessage.bind(this)
);
registry.registerTool(
'messages.delete_message',
{
name: 'messages.delete_message',
description: 'Delete a message',
inputSchema: {
type: 'object',
properties: {
message_id: { type: 'string' }
},
required: ['message_id']
}
},
this.deleteMessage.bind(this)
);
registry.registerTool(
'messages.inbox',
{
name: 'messages.inbox',
description: 'Get inbox messages for a target',
inputSchema: {
type: 'object',
properties: {
target: { type: 'string', enum: ['self', 'user'] },
only_unread: { type: 'boolean' },
limit: { type: 'number' },
cursor: { type: 'string' }
},
required: ['target']
}
},
this.inbox.bind(this)
);
registry.registerTool(
'messages.mark_read',
{
name: 'messages.mark_read',
description: 'Mark a message as read',
inputSchema: {
type: 'object',
properties: {
message_id: { type: 'string' }
},
required: ['message_id']
}
},
this.markRead.bind(this)
);
}
private async createThread(params: {
subject: string;
target: MessageTarget;
tags?: string[];
}): Promise<CreateThreadResponse> {
const threadId = uuidv4();
const now = new Date().toISOString();
const thread: StoredThread = {
thread_id: threadId,
subject: params.subject,
target: params.target,
tags: params.tags,
created_at: now,
last_message_at: now,
unread_count: 0,
messages: []
};
this.threads.set(threadId, thread);
return { thread_id: threadId };
}
private async postMessage(params: {
thread_id: string;
body: string;
meta?: { in_reply_to?: string };
}): Promise<PostMessageResponse> {
const thread = this.threads.get(params.thread_id);
if (!thread) {
throw new NotFoundError('Thread', params.thread_id);
}
const messageId = uuidv4();
const now = new Date().toISOString();
const message: Message = {
id: messageId,
thread_id: params.thread_id,
from: 'system',
body: params.body,
created_at: now,
meta: params.meta
};
thread.messages.push(message);
thread.last_message_at = now;
thread.unread_count++;
this.messages.set(messageId, message);
return {
message_id: messageId,
created_at: now
};
}
private async listThreads(params: {
filter?: ThreadFilter;
}): Promise<ListThreadsResponse> {
const filter = params.filter || {};
let threads = Array.from(this.threads.values());
// Apply filters
if (filter.target) {
threads = threads.filter(t => t.target === filter.target);
}
if (filter.tags && filter.tags.length > 0) {
threads = threads.filter(t =>
filter.tags!.some(tag => t.tags?.includes(tag))
);
}
if (filter.q) {
const query = filter.q.toLowerCase();
threads = threads.filter(t =>
t.subject.toLowerCase().includes(query) ||
t.messages.some(m => m.body.toLowerCase().includes(query))
);
}
// Sort by last message
threads.sort((a, b) =>
new Date(b.last_message_at).getTime() - new Date(a.last_message_at).getTime()
);
// Apply pagination
const limit = filter.limit || 20;
const startIdx = filter.cursor ? parseInt(filter.cursor) : 0;
const paginatedThreads = threads.slice(startIdx, startIdx + limit);
const hasMore = startIdx + limit < threads.length;
return {
threads: paginatedThreads.map(t => ({
thread_id: t.thread_id,
subject: t.subject,
target: t.target,
tags: t.tags,
created_at: t.created_at,
last_message_at: t.last_message_at,
unread_count: t.unread_count
})),
next_cursor: hasMore ? String(startIdx + limit) : undefined
};
}
private async getThread(params: {
thread_id: string;
include_messages?: boolean;
}): Promise<GetThreadResponse> {
const thread = this.threads.get(params.thread_id);
if (!thread) {
throw new NotFoundError('Thread', params.thread_id);
}
const threadData: any = {
thread_id: thread.thread_id,
subject: thread.subject,
target: thread.target,
tags: thread.tags,
created_at: thread.created_at,
last_message_at: thread.last_message_at,
unread_count: thread.unread_count
};
if (params.include_messages) {
threadData.messages = thread.messages;
}
return { thread: threadData };
}
private async updateMessage(params: {
message_id: string;
body: string;
}): Promise<UpdateMessageResponse> {
const message = this.messages.get(params.message_id);
if (!message) {
throw new NotFoundError('Message', params.message_id);
}
message.body = params.body;
message.edited_at = new Date().toISOString();
// Update in thread as well
const thread = this.threads.get(message.thread_id);
if (thread) {
const msgIndex = thread.messages.findIndex(m => m.id === params.message_id);
if (msgIndex >= 0) {
thread.messages[msgIndex] = message;
}
}
return {
ok: true,
edited_at: message.edited_at
};
}
private async deleteMessage(params: {
message_id: string;
}): Promise<{ ok: true }> {
const message = this.messages.get(params.message_id);
if (!message) {
throw new NotFoundError('Message', params.message_id);
}
// Remove from thread
const thread = this.threads.get(message.thread_id);
if (thread) {
thread.messages = thread.messages.filter(m => m.id !== params.message_id);
}
this.messages.delete(params.message_id);
return { ok: true };
}
private async inbox(params: {
target: MessageTarget;
only_unread?: boolean;
limit?: number;
cursor?: string;
}): Promise<InboxResponse> {
const threads = Array.from(this.threads.values()).filter(t => t.target === params.target);
const allMessages: Message[] = [];
for (const thread of threads) {
for (const message of thread.messages) {
if (!params.only_unread || thread.unread_count > 0) {
allMessages.push(message);
}
}
}
// Sort by created_at descending
allMessages.sort((a, b) =>
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
// Apply pagination
const limit = params.limit || 50;
const startIdx = params.cursor ? parseInt(params.cursor) : 0;
const paginatedMessages = allMessages.slice(startIdx, startIdx + limit);
const hasMore = startIdx + limit < allMessages.length;
return {
messages: paginatedMessages,
next_cursor: hasMore ? String(startIdx + limit) : undefined
};
}
private async markRead(params: {
message_id: string;
}): Promise<{ ok: true }> {
const message = this.messages.get(params.message_id);
if (!message) {
throw new NotFoundError('Message', params.message_id);
}
const thread = this.threads.get(message.thread_id);
if (thread && thread.unread_count > 0) {
thread.unread_count--;
}
return { ok: true };
}
}