Skip to main content
Glama
mkXultra
by mkXultra
MessageService.ts12.8 kB
import { MessageStorage } from './MessageStorage'; import { MessageValidator } from './MessageValidator'; import { MessageCache } from './MessageCache'; import { generateUUID } from './utils'; import { getDataDirectory } from '../../utils/dataDir'; import { LockService } from '../../services/LockService'; import { SendMessageParams, SendMessageResponse, GetMessagesParams, MessageListResponse, MessageStorageData, WaitForMessagesParams, WaitForMessagesResponse, WaitingAgentInfo, ReadStatusInfo, Message } from './types/messaging.types'; import { WAIT_CONSTANTS, READ_STATUS_FILENAME, WAITING_AGENTS_FILENAME } from './constants'; import { RoomNotFoundError, AgentNotInRoomError } from '../../errors/AppError'; import * as fs from 'fs/promises'; import * as path from 'path'; export class MessageService { private readonly storage: MessageStorage; private readonly cache: MessageCache; private readonly lockService: LockService; constructor(dataDir: string = getDataDirectory(), cacheCapacity?: number, lockService?: LockService) { this.lockService = lockService || new LockService(dataDir); this.storage = new MessageStorage(dataDir, this.lockService); this.cache = new MessageCache(cacheCapacity); } async sendMessage(params: SendMessageParams): Promise<SendMessageResponse> { // Validate input parameters const validatedParams = MessageValidator.validateSendMessage(params); // Room existence check is handled by MessagingAdapter // Generate message ID and timestamp const messageId = generateUUID(); const timestamp = new Date().toISOString(); // Extract mentions from message const mentions = MessageValidator.extractMentions(validatedParams.message); // Create message data for storage const messageData: MessageStorageData = { id: messageId, agentName: validatedParams.agentName, message: validatedParams.message, timestamp, mentions, metadata: validatedParams.metadata }; // Save message to storage await this.storage.saveMessage(validatedParams.roomName, messageData); // Clear cache for this room since new message was added this.cache.clear(validatedParams.roomName); // Return response return { success: true, messageId, roomName: validatedParams.roomName, timestamp, mentions }; } async getMessages(params: GetMessagesParams): Promise<MessageListResponse> { // Validate input parameters const validatedParams = MessageValidator.validateGetMessages(params); // Room existence check is handled by MessagingAdapter // Check cache first const cachedResult = this.cache.get(validatedParams); if (cachedResult) { return cachedResult; } // Get messages from storage const result = await this.storage.getMessages(validatedParams); // Prepare response const response: MessageListResponse = { roomName: validatedParams.roomName, messages: result.messages, count: result.messages.length, hasMore: result.hasMore }; // Cache the result this.cache.set(validatedParams, response); return response; } async getMessageCount(roomName: string): Promise<number> { // Validate room name MessageValidator.validateGetMessages({ roomName }); // Room existence check is handled by caller return await this.storage.getMessageCount(roomName); } clearRoomCache(roomName: string): void { this.cache.clear(roomName); } async waitForMessages(params: WaitForMessagesParams): Promise<WaitForMessagesResponse> { // Validate input parameters const validatedParams = MessageValidator.validateWaitForMessages(params); // Note: Room existence and agent membership checks should be done by the adapter layer // The MessageService itself doesn't have access to room data const timeout = validatedParams.timeout || WAIT_CONSTANTS.DEFAULT_TIMEOUT; const startTime = Date.now(); let pollInterval = WAIT_CONSTANTS.INITIAL_POLL_INTERVAL; // Add agent to waiting list await this.addWaitingAgent(validatedParams.roomName, validatedParams.agentName, timeout); // Send system message about waiting await this.sendSystemMessage( validatedParams.roomName, `${validatedParams.agentName} is waiting for new messages (timeout: ${timeout}ms)` ); // Check for deadlock const waitingAgents = await this.getWaitingAgents(validatedParams.roomName); const otherWaitingAgents = waitingAgents .filter(agent => agent.agentName !== validatedParams.agentName) .map(agent => agent.agentName); let warning: string | undefined; if (otherWaitingAgents.length >= 1) { warning = `Potential deadlock detected: ${otherWaitingAgents.length} other agent(s) are also waiting for messages`; } try { // Poll for new messages while (Date.now() - startTime < timeout) { // Get unread messages const unreadMessages = await this.getUnreadMessages( validatedParams.roomName, validatedParams.agentName ); if (unreadMessages.length > 0) { // Update read status const lastMessage = unreadMessages[unreadMessages.length - 1]!; // Safe because we checked length > 0 await this.updateReadStatus( validatedParams.roomName, validatedParams.agentName, lastMessage ); // Remove from waiting list await this.removeWaitingAgent(validatedParams.roomName, validatedParams.agentName); return { messages: unreadMessages, hasNewMessages: true, timedOut: false, warning, waitingAgents: otherWaitingAgents.length > 0 ? otherWaitingAgents : undefined }; } // Wait before next poll with exponential backoff await new Promise(resolve => setTimeout(resolve, pollInterval)); pollInterval = Math.min(pollInterval * WAIT_CONSTANTS.BACKOFF_FACTOR, WAIT_CONSTANTS.MAX_POLL_INTERVAL); } // Timeout reached await this.removeWaitingAgent(validatedParams.roomName, validatedParams.agentName); // Send system message about timeout await this.sendSystemMessage( validatedParams.roomName, `${validatedParams.agentName} stopped waiting (timeout reached)` ); return { messages: [], hasNewMessages: false, timedOut: true, warning, waitingAgents: otherWaitingAgents.length > 0 ? otherWaitingAgents : undefined }; } catch (error) { // Clean up on error try { await this.removeWaitingAgent(validatedParams.roomName, validatedParams.agentName); } catch (cleanupError) { // Ignore cleanup errors } throw error; } } private async getUnreadMessages(roomName: string, agentName: string): Promise<Message[]> { // Get last read status const readStatus = await this.getReadStatus(roomName, agentName); // Get all messages const allMessages = await this.storage.getAllMessages(roomName); // Filter unread messages (excluding agent's own messages) let unreadMessages: Message[] = []; if (readStatus) { const lastReadIndex = allMessages.findIndex(msg => msg.id === readStatus.lastReadMessageId); if (lastReadIndex >= 0) { unreadMessages = allMessages.slice(lastReadIndex + 1); } else { // If last read message not found, consider all messages as unread unreadMessages = allMessages; } } else { // No read status, all messages are unread unreadMessages = allMessages; } // Filter out agent's own messages and system messages return unreadMessages.filter(msg => msg.agentName !== agentName && msg.agentName !== 'system'); } private async getReadStatus(roomName: string, agentName: string): Promise<ReadStatusInfo | null> { const readStatusPath = path.join(this.storage.dataDirectory, 'rooms', roomName, READ_STATUS_FILENAME); try { const data = await fs.readFile(readStatusPath, 'utf-8'); const readStatuses = JSON.parse(data); return readStatuses[agentName] || null; } catch (error: any) { if (error.code === 'ENOENT') { return null; } // For corrupted files, return null to treat as no read status if (error instanceof SyntaxError) { return null; } throw error; } } private async updateReadStatus(roomName: string, agentName: string, lastMessage: Message): Promise<void> { const roomDir = path.join(this.storage.dataDirectory, 'rooms', roomName); const readStatusPath = path.join(roomDir, READ_STATUS_FILENAME); await this.lockService.withLock(`read-status-${roomName}`, async () => { // Ensure room directory exists try { await fs.access(roomDir); } catch (error: any) { if (error.code === 'ENOENT') { // If room doesn't exist, skip updating read status return; } throw error; } let readStatuses: Record<string, ReadStatusInfo> = {}; try { const data = await fs.readFile(readStatusPath, 'utf-8'); readStatuses = JSON.parse(data); } catch (error: any) { if (error.code !== 'ENOENT') { // Ignore corrupted files, start fresh if (!(error instanceof SyntaxError)) { throw error; } } } readStatuses[agentName] = { agentName, lastReadMessageId: lastMessage.id, lastReadTimestamp: new Date().toISOString() }; await fs.writeFile(readStatusPath, JSON.stringify(readStatuses, null, 2)); }); } private async addWaitingAgent(roomName: string, agentName: string, timeout: number): Promise<void> { const roomDir = path.join(this.storage.dataDirectory, 'rooms', roomName); const waitingAgentsPath = path.join(roomDir, WAITING_AGENTS_FILENAME); await this.lockService.withLock(`waiting-agents-${roomName}`, async () => { // Ensure room directory exists try { await fs.access(roomDir); } catch (error: any) { if (error.code === 'ENOENT') { throw new RoomNotFoundError(roomName); } throw error; } let waitingAgents: WaitingAgentInfo[] = []; try { const data = await fs.readFile(waitingAgentsPath, 'utf-8'); waitingAgents = JSON.parse(data); } catch (error: any) { if (error.code !== 'ENOENT') { throw error; } } // Remove any existing entry for this agent waitingAgents = waitingAgents.filter(agent => agent.agentName !== agentName); // Add new entry waitingAgents.push({ agentName, startTime: new Date().toISOString(), timeout }); await fs.writeFile(waitingAgentsPath, JSON.stringify(waitingAgents, null, 2)); }); } private async removeWaitingAgent(roomName: string, agentName: string): Promise<void> { const waitingAgentsPath = path.join(this.storage.dataDirectory, 'rooms', roomName, WAITING_AGENTS_FILENAME); await this.lockService.withLock(`waiting-agents-${roomName}`, async () => { let waitingAgents: WaitingAgentInfo[] = []; try { const data = await fs.readFile(waitingAgentsPath, 'utf-8'); waitingAgents = JSON.parse(data); } catch (error: any) { if (error.code !== 'ENOENT') { throw error; } } // Remove agent from list waitingAgents = waitingAgents.filter(agent => agent.agentName !== agentName); await fs.writeFile(waitingAgentsPath, JSON.stringify(waitingAgents, null, 2)); }); } private async getWaitingAgents(roomName: string): Promise<WaitingAgentInfo[]> { const waitingAgentsPath = path.join(this.storage.dataDirectory, 'rooms', roomName, WAITING_AGENTS_FILENAME); try { const data = await fs.readFile(waitingAgentsPath, 'utf-8'); return JSON.parse(data); } catch (error: any) { if (error.code === 'ENOENT') { return []; } throw error; } } private async sendSystemMessage(roomName: string, message: string): Promise<void> { const messageData: MessageStorageData = { id: generateUUID(), agentName: 'system', message, timestamp: new Date().toISOString(), mentions: [], metadata: { type: 'system' } }; await this.storage.saveMessage(roomName, messageData); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mkXultra/agent-communication-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server