Skip to main content
Glama
mkXultra
by mkXultra
MessageStorage.ts5.68 kB
import * as fs from 'fs/promises'; import * as path from 'path'; import { StorageError } from '../../errors/AppError'; import { Message, MessageStorageData, GetMessagesParams } from './types/messaging.types'; import { getDataDirectory } from '../../utils/dataDir'; import { LockService } from '../../services/LockService'; export class MessageStorage { private readonly dataDir: string; private readonly lockService: LockService; constructor(dataDir: string = getDataDirectory(), lockService?: LockService) { this.dataDir = dataDir; this.lockService = lockService || new LockService(dataDir); } private getMessagesFilePath(roomName: string): string { return path.join(this.dataDir, 'rooms', roomName, 'messages.jsonl'); } private async ensureDirectoryExists(dirPath: string): Promise<void> { try { await fs.mkdir(dirPath, { recursive: true }); } catch (error) { throw new StorageError( 'createDirectory', `Failed to create directory: ${dirPath}` ); } } async saveMessage(roomName: string, messageData: MessageStorageData): Promise<void> { const filePath = this.getMessagesFilePath(roomName); const dirPath = path.dirname(filePath); try { await this.lockService.withLock(`rooms/${roomName}/messages.jsonl`, async () => { await this.ensureDirectoryExists(dirPath); const jsonLine = JSON.stringify(messageData) + '\n'; await fs.appendFile(filePath, jsonLine, 'utf8'); }); } catch (error) { if (error instanceof StorageError) { throw error; } throw new StorageError( 'saveMessage', `Failed to save message: ${error instanceof Error ? error.message : 'Unknown error'}` ); } } async getMessages(params: GetMessagesParams): Promise<{ messages: Message[]; hasMore: boolean }> { const filePath = this.getMessagesFilePath(params.roomName); const limit = params.limit ?? 50; const offset = params.offset ?? 0; try { // Check if file exists try { await fs.access(filePath); } catch { // File doesn't exist, return empty result return { messages: [], hasMore: false }; } const fileContent = await fs.readFile(filePath, 'utf8'); const lines = fileContent.trim().split('\n').filter(line => line.trim()); // Parse all messages const allMessages: Message[] = []; for (const line of lines) { try { const data: MessageStorageData = JSON.parse(line); const message: Message = { ...data, roomName: params.roomName }; allMessages.push(message); } catch (parseError) { // Skip malformed lines continue; } } // Filter by mentions if requested let filteredMessages = allMessages; if (params.mentionsOnly && params.agentName) { filteredMessages = allMessages.filter(msg => msg.mentions.includes(params.agentName!) ); } // Sort by timestamp (newest first) filteredMessages.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime()); // Apply pagination const startIndex = offset; const endIndex = startIndex + limit; const paginatedMessages = filteredMessages.slice(startIndex, endIndex); const hasMore = endIndex < filteredMessages.length; return { messages: paginatedMessages, hasMore }; } catch (error) { if (error instanceof StorageError) { throw error; } throw new StorageError( 'readMessages', `Failed to read messages: ${error instanceof Error ? error.message : 'Unknown error'}` ); } } async getMessageCount(roomName: string): Promise<number> { const filePath = this.getMessagesFilePath(roomName); try { // Check if file exists try { await fs.access(filePath); } catch { return 0; } const fileContent = await fs.readFile(filePath, 'utf8'); const lines = fileContent.trim().split('\n').filter(line => line.trim()); return lines.length; } catch (error) { throw new StorageError( 'getMessageCount', `Failed to count messages: ${error instanceof Error ? error.message : 'Unknown error'}` ); } } async getAllMessages(roomName: string): Promise<Message[]> { const filePath = this.getMessagesFilePath(roomName); try { // Check if file exists try { await fs.access(filePath); } catch { return []; } const fileContent = await fs.readFile(filePath, 'utf8'); const lines = fileContent.trim().split('\n').filter(line => line.trim()); // Parse all messages const allMessages: Message[] = []; for (const line of lines) { try { const data: MessageStorageData = JSON.parse(line); const message: Message = { ...data, roomName }; allMessages.push(message); } catch (parseError) { // Skip malformed lines continue; } } // Sort by timestamp (oldest first for read tracking) allMessages.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()); return allMessages; } catch (error) { throw new StorageError( 'getAllMessages', `Failed to get all messages: ${error instanceof Error ? error.message : 'Unknown error'}` ); } } get dataDirectory(): string { return this.dataDir; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mkXultra/agent-communication-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server