Skip to main content
Glama
by Coder-RL
message-queue.ts24.1 kB
import { EventEmitter } from 'events'; import * as crypto from 'crypto'; export interface Message { id: string; topic: string; payload: any; headers: Record<string, string>; timestamp: Date; producer: string; priority: number; expiry?: Date; deliveryCount: number; maxDeliveryAttempts: number; delayUntil?: Date; correlationId?: string; replyTo?: string; metadata: Record<string, any>; } export interface Queue { name: string; type: 'fifo' | 'priority' | 'delayed' | 'pub-sub'; maxSize: number; maxMessageSize: number; messageRetention: number; // seconds deadLetterQueue?: string; dlqThreshold: number; encryption: boolean; persistence: boolean; replication: number; // number of replicas consumerGroups: string[]; metrics: QueueMetrics; created: Date; lastModified: Date; } export interface QueueMetrics { totalMessages: number; pendingMessages: number; processedMessages: number; failedMessages: number; averageProcessingTime: number; throughputPerSecond: number; lastActivity: Date; } export interface Consumer { id: string; name: string; groupId?: string; queues: string[]; batchSize: number; prefetch: number; ackTimeout: number; // seconds maxConcurrency: number; filterExpression?: string; processingFunction: (messages: Message[]) => Promise<ProcessingResult[]>; status: 'active' | 'paused' | 'stopped' | 'error'; lastHeartbeat: Date; metrics: ConsumerMetrics; } export interface ConsumerMetrics { messagesProcessed: number; messagesAcknowledged: number; messagesRejected: number; averageProcessingTime: number; errorRate: number; lastError?: string; } export interface ProcessingResult { messageId: string; status: 'ack' | 'nack' | 'reject' | 'retry'; error?: string; retryDelay?: number; // seconds metadata?: Record<string, any>; } export interface Producer { id: string; name: string; defaultTopic?: string; compressionEnabled: boolean; batchingEnabled: boolean; batchSize: number; batchTimeout: number; // milliseconds confirmationRequired: boolean; retryPolicy: RetryPolicy; metrics: ProducerMetrics; } export interface ProducerMetrics { messagesSent: number; messagesConfirmed: number; messagesFailed: number; averageLatency: number; throughputPerSecond: number; } export interface RetryPolicy { maxAttempts: number; initialDelay: number; // milliseconds maxDelay: number; // milliseconds backoffMultiplier: number; jitter: boolean; } export interface Subscription { id: string; consumerId: string; topic: string; partition?: number; offset: number; autoCommit: boolean; commitInterval: number; // milliseconds sessionTimeout: number; // milliseconds heartbeatInterval: number; // milliseconds status: 'active' | 'paused' | 'closed'; } export class MessageQueue extends EventEmitter { private queues = new Map<string, Queue>(); private messages = new Map<string, Map<string, Message>>(); // queueName -> messageId -> message private consumers = new Map<string, Consumer>(); private producers = new Map<string, Producer>(); private subscriptions = new Map<string, Subscription>(); private processingMessages = new Map<string, Set<string>>(); // consumerId -> messageIds private delayedMessages = new Map<string, NodeJS.Timeout>(); // messageId -> timeout private deadLetterQueues = new Map<string, Message[]>(); private encryptionKey: Buffer | null = null; private processingInterval: NodeJS.Timeout | null = null; private metricsInterval: NodeJS.Timeout | null = null; private cleanupInterval: NodeJS.Timeout | null = null; constructor() { super(); this.initializeEncryption(); this.startProcessingLoop(); this.startMetricsCollection(); this.startCleanupProcess(); } private initializeEncryption(): void { this.encryptionKey = crypto.randomBytes(32); } createQueue(config: Omit<Queue, 'metrics' | 'created' | 'lastModified'>): void { const queue: Queue = { ...config, metrics: { totalMessages: 0, pendingMessages: 0, processedMessages: 0, failedMessages: 0, averageProcessingTime: 0, throughputPerSecond: 0, lastActivity: new Date() }, created: new Date(), lastModified: new Date() }; this.queues.set(queue.name, queue); this.messages.set(queue.name, new Map()); // Create dead letter queue if specified if (queue.deadLetterQueue) { this.deadLetterQueues.set(queue.deadLetterQueue, []); } this.emit('queue-created', queue); } deleteQueue(queueName: string, purge: boolean = false): boolean { const queue = this.queues.get(queueName); if (!queue) { return false; } // Stop all consumers for this queue for (const consumer of this.consumers.values()) { if (consumer.queues.includes(queueName)) { this.stopConsumer(consumer.id); } } // Clean up messages if (purge) { this.messages.delete(queueName); } this.queues.delete(queueName); this.emit('queue-deleted', { queueName, purged: purge }); return true; } async sendMessage( queueName: string, payload: any, options: { headers?: Record<string, string>; priority?: number; expiry?: Date; delayUntil?: Date; correlationId?: string; replyTo?: string; producerId?: string; } = {} ): Promise<string> { const queue = this.queues.get(queueName); if (!queue) { throw new Error(`Queue not found: ${queueName}`); } // Check queue size limits const queueMessages = this.messages.get(queueName)!; if (queueMessages.size >= queue.maxSize) { throw new Error(`Queue is full: ${queueName}`); } // Check message size const messageSize = JSON.stringify(payload).length; if (messageSize > queue.maxMessageSize) { throw new Error(`Message too large: ${messageSize} bytes`); } const message: Message = { id: crypto.randomUUID(), topic: queueName, payload: queue.encryption ? this.encrypt(payload) : payload, headers: options.headers || {}, timestamp: new Date(), producer: options.producerId || 'anonymous', priority: options.priority || 0, expiry: options.expiry, deliveryCount: 0, maxDeliveryAttempts: 3, delayUntil: options.delayUntil, correlationId: options.correlationId, replyTo: options.replyTo, metadata: {} }; // Handle delayed messages if (message.delayUntil && message.delayUntil > new Date()) { const delay = message.delayUntil.getTime() - Date.now(); const timeout = setTimeout(() => { this.addMessageToQueue(queueName, message); this.delayedMessages.delete(message.id); }, delay); this.delayedMessages.set(message.id, timeout); } else { this.addMessageToQueue(queueName, message); } // Update producer metrics if (options.producerId) { const producer = this.producers.get(options.producerId); if (producer) { producer.metrics.messagesSent++; } } this.emit('message-sent', { queueName, messageId: message.id }); return message.id; } private addMessageToQueue(queueName: string, message: Message): void { const queueMessages = this.messages.get(queueName)!; queueMessages.set(message.id, message); const queue = this.queues.get(queueName)!; queue.metrics.totalMessages++; queue.metrics.pendingMessages++; queue.metrics.lastActivity = new Date(); queue.lastModified = new Date(); this.emit('message-queued', { queueName, messageId: message.id }); } registerProducer(config: Omit<Producer, 'metrics'>): string { const producer: Producer = { ...config, metrics: { messagesSent: 0, messagesConfirmed: 0, messagesFailed: 0, averageLatency: 0, throughputPerSecond: 0 } }; this.producers.set(producer.id, producer); this.emit('producer-registered', producer); return producer.id; } registerConsumer(config: Omit<Consumer, 'lastHeartbeat' | 'metrics'>): string { const consumer: Consumer = { ...config, lastHeartbeat: new Date(), metrics: { messagesProcessed: 0, messagesAcknowledged: 0, messagesRejected: 0, averageProcessingTime: 0, errorRate: 0 } }; this.consumers.set(consumer.id, consumer); this.processingMessages.set(consumer.id, new Set()); // Create subscriptions for each queue for (const queueName of consumer.queues) { const subscription: Subscription = { id: crypto.randomUUID(), consumerId: consumer.id, topic: queueName, offset: 0, autoCommit: true, commitInterval: 5000, sessionTimeout: 30000, heartbeatInterval: 3000, status: 'active' }; this.subscriptions.set(subscription.id, subscription); } this.emit('consumer-registered', consumer); return consumer.id; } stopConsumer(consumerId: string): boolean { const consumer = this.consumers.get(consumerId); if (!consumer) { return false; } consumer.status = 'stopped'; // Close subscriptions for (const subscription of this.subscriptions.values()) { if (subscription.consumerId === consumerId) { subscription.status = 'closed'; } } // Requeue processing messages const processingSet = this.processingMessages.get(consumerId); if (processingSet) { for (const messageId of processingSet) { // Find and requeue the message for (const [queueName, queueMessages] of this.messages) { const message = queueMessages.get(messageId); if (message) { message.deliveryCount = 0; // Reset delivery count break; } } } processingSet.clear(); } this.emit('consumer-stopped', { consumerId }); return true; } private startProcessingLoop(): void { this.processingInterval = setInterval(async () => { await this.processMessages(); }, 100); // Process every 100ms } private async processMessages(): Promise<void> { for (const consumer of this.consumers.values()) { if (consumer.status !== 'active') { continue; } try { await this.processConsumerMessages(consumer); } catch (error) { consumer.status = 'error'; consumer.metrics.lastError = (error as Error).message; this.emit('consumer-error', { consumerId: consumer.id, error: (error as Error).message }); } } } private async processConsumerMessages(consumer: Consumer): Promise<void> { const processingSet = this.processingMessages.get(consumer.id)!; // Don't exceed max concurrency if (processingSet.size >= consumer.maxConcurrency) { return; } const messagesToProcess: Message[] = []; const remainingCapacity = Math.min( consumer.batchSize, consumer.maxConcurrency - processingSet.size ); // Collect messages from subscribed queues for (const queueName of consumer.queues) { if (messagesToProcess.length >= remainingCapacity) { break; } const messages = this.getMessagesForConsumer(queueName, consumer, remainingCapacity - messagesToProcess.length); messagesToProcess.push(...messages); } if (messagesToProcess.length === 0) { return; } // Mark messages as being processed for (const message of messagesToProcess) { processingSet.add(message.id); message.deliveryCount++; } // Process messages const startTime = Date.now(); try { const results = await consumer.processingFunction(messagesToProcess); const processingTime = Date.now() - startTime; // Update consumer metrics consumer.metrics.messagesProcessed += messagesToProcess.length; consumer.metrics.averageProcessingTime = (consumer.metrics.averageProcessingTime + processingTime) / 2; // Handle processing results await this.handleProcessingResults(messagesToProcess, results, consumer); } catch (error) { // Handle processing failure await this.handleProcessingFailure(messagesToProcess, consumer, (error as Error).message); } // Remove from processing set for (const message of messagesToProcess) { processingSet.delete(message.id); } } private getMessagesForConsumer(queueName: string, consumer: Consumer, limit: number): Message[] { const queue = this.queues.get(queueName); const queueMessages = this.messages.get(queueName); if (!queue || !queueMessages) { return []; } const messages = Array.from(queueMessages.values()) .filter(message => { // Skip expired messages if (message.expiry && message.expiry < new Date()) { return false; } // Skip delayed messages if (message.delayUntil && message.delayUntil > new Date()) { return false; } // Apply filter expression if configured if (consumer.filterExpression) { return this.evaluateFilterExpression(message, consumer.filterExpression); } return true; }) .sort((a, b) => { // Sort by priority (higher first), then by timestamp if (a.priority !== b.priority) { return b.priority - a.priority; } return a.timestamp.getTime() - b.timestamp.getTime(); }) .slice(0, limit); return messages; } private evaluateFilterExpression(message: Message, expression: string): boolean { // Simplified filter expression evaluation // In production, use a proper expression evaluator try { const context = { headers: message.headers, priority: message.priority, producer: message.producer, topic: message.topic }; // Very basic evaluation - in practice use a safe expression evaluator return true; } catch { return false; } } private async handleProcessingResults( messages: Message[], results: ProcessingResult[], consumer: Consumer ): Promise<void> { for (let i = 0; i < messages.length; i++) { const message = messages[i]; const result = results[i] || { messageId: message.id, status: 'nack' }; switch (result.status) { case 'ack': await this.acknowledgeMessage(message, consumer); break; case 'nack': await this.negativeAcknowledgeMessage(message, consumer, result.error); break; case 'reject': await this.rejectMessage(message, consumer, result.error); break; case 'retry': await this.retryMessage(message, consumer, result.retryDelay || 60); break; } } } private async handleProcessingFailure( messages: Message[], consumer: Consumer, error: string ): Promise<void> { for (const message of messages) { await this.negativeAcknowledgeMessage(message, consumer, error); } } private async acknowledgeMessage(message: Message, consumer: Consumer): Promise<void> { const queueMessages = this.messages.get(message.topic)!; queueMessages.delete(message.id); const queue = this.queues.get(message.topic)!; queue.metrics.processedMessages++; queue.metrics.pendingMessages--; consumer.metrics.messagesAcknowledged++; this.emit('message-acknowledged', { messageId: message.id, queueName: message.topic, consumerId: consumer.id }); } private async negativeAcknowledgeMessage(message: Message, consumer: Consumer, error?: string): Promise<void> { const queue = this.queues.get(message.topic)!; if (message.deliveryCount >= message.maxDeliveryAttempts) { // Send to dead letter queue await this.sendToDeadLetterQueue(message, queue, error); } else { // Requeue for retry message.delayUntil = new Date(Date.now() + 30000); // 30 second delay } consumer.metrics.messagesRejected++; queue.metrics.failedMessages++; this.emit('message-nacked', { messageId: message.id, queueName: message.topic, consumerId: consumer.id, error }); } private async rejectMessage(message: Message, consumer: Consumer, error?: string): Promise<void> { const queue = this.queues.get(message.topic)!; await this.sendToDeadLetterQueue(message, queue, error); consumer.metrics.messagesRejected++; queue.metrics.failedMessages++; this.emit('message-rejected', { messageId: message.id, queueName: message.topic, consumerId: consumer.id, error }); } private async retryMessage(message: Message, consumer: Consumer, delaySeconds: number): Promise<void> { message.delayUntil = new Date(Date.now() + delaySeconds * 1000); message.deliveryCount = Math.max(0, message.deliveryCount - 1); // Don't increment delivery count for explicit retries this.emit('message-retried', { messageId: message.id, queueName: message.topic, consumerId: consumer.id, delaySeconds }); } private async sendToDeadLetterQueue(message: Message, queue: Queue, error?: string): Promise<void> { if (queue.deadLetterQueue) { const dlqMessages = this.deadLetterQueues.get(queue.deadLetterQueue); if (dlqMessages) { message.metadata.deadLetterReason = error || 'Max delivery attempts exceeded'; message.metadata.originalQueue = queue.name; message.metadata.deadLetterTimestamp = new Date(); dlqMessages.push(message); } } // Remove from original queue const queueMessages = this.messages.get(message.topic)!; queueMessages.delete(message.id); queue.metrics.pendingMessages--; this.emit('message-dead-lettered', { messageId: message.id, originalQueue: queue.name, deadLetterQueue: queue.deadLetterQueue, reason: error }); } private encrypt(data: any): any { if (!this.encryptionKey) { return data; } const json = JSON.stringify(data); const iv = crypto.randomBytes(16); const cipher = crypto.createCipher('aes-256-cbc', this.encryptionKey); let encrypted = cipher.update(json, 'utf8', 'hex'); encrypted += cipher.final('hex'); return { encrypted: true, data: encrypted, iv: iv.toString('hex') }; } private decrypt(encryptedData: any): any { if (!encryptedData.encrypted || !this.encryptionKey) { return encryptedData; } const decipher = crypto.createDecipher('aes-256-cbc', this.encryptionKey); let decrypted = decipher.update(encryptedData.data, 'hex', 'utf8'); decrypted += decipher.final('utf8'); return JSON.parse(decrypted); } private startMetricsCollection(): void { this.metricsInterval = setInterval(() => { this.updateMetrics(); }, 60000); // Every minute } private updateMetrics(): void { for (const queue of this.queues.values()) { const queueMessages = this.messages.get(queue.name)!; queue.metrics.pendingMessages = queueMessages.size; // Calculate throughput (simplified) queue.metrics.throughputPerSecond = queue.metrics.processedMessages / 60; } for (const producer of this.producers.values()) { producer.metrics.throughputPerSecond = producer.metrics.messagesSent / 60; // Reset counters for next period producer.metrics.messagesSent = 0; } this.emit('metrics-updated'); } private startCleanupProcess(): void { this.cleanupInterval = setInterval(() => { this.cleanupExpiredMessages(); this.cleanupStaleConsumers(); }, 300000); // Every 5 minutes } private cleanupExpiredMessages(): void { for (const [queueName, queueMessages] of this.messages) { const expiredMessages: string[] = []; for (const [messageId, message] of queueMessages) { if (message.expiry && message.expiry < new Date()) { expiredMessages.push(messageId); } } for (const messageId of expiredMessages) { queueMessages.delete(messageId); const queue = this.queues.get(queueName)!; queue.metrics.pendingMessages--; this.emit('message-expired', { messageId, queueName }); } } } private cleanupStaleConsumers(): void { const staleThreshold = 5 * 60 * 1000; // 5 minutes const now = Date.now(); for (const consumer of this.consumers.values()) { const timeSinceHeartbeat = now - consumer.lastHeartbeat.getTime(); if (timeSinceHeartbeat > staleThreshold && consumer.status === 'active') { consumer.status = 'stopped'; this.emit('consumer-stale', { consumerId: consumer.id }); } } } // Public API methods getQueues(): Queue[] { return Array.from(this.queues.values()); } getQueue(queueName: string): Queue | null { return this.queues.get(queueName) || null; } getConsumers(): Consumer[] { return Array.from(this.consumers.values()); } getProducers(): Producer[] { return Array.from(this.producers.values()); } getQueueMessages(queueName: string, limit: number = 100): Message[] { const queueMessages = this.messages.get(queueName); if (!queueMessages) { return []; } return Array.from(queueMessages.values()) .sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()) .slice(0, limit); } getDeadLetterMessages(dlqName: string): Message[] { return this.deadLetterQueues.get(dlqName) || []; } purgeQueue(queueName: string): boolean { const queueMessages = this.messages.get(queueName); if (!queueMessages) { return false; } const messageCount = queueMessages.size; queueMessages.clear(); const queue = this.queues.get(queueName)!; queue.metrics.pendingMessages = 0; this.emit('queue-purged', { queueName, messageCount }); return true; } getStats(): any { const queues = Array.from(this.queues.values()); const consumers = Array.from(this.consumers.values()); const producers = Array.from(this.producers.values()); return { queues: { total: queues.length, totalMessages: queues.reduce((sum, q) => sum + q.metrics.totalMessages, 0), pendingMessages: queues.reduce((sum, q) => sum + q.metrics.pendingMessages, 0), processedMessages: queues.reduce((sum, q) => sum + q.metrics.processedMessages, 0) }, consumers: { total: consumers.length, active: consumers.filter(c => c.status === 'active').length, stopped: consumers.filter(c => c.status === 'stopped').length, error: consumers.filter(c => c.status === 'error').length }, producers: { total: producers.length, totalMessagesSent: producers.reduce((sum, p) => sum + p.metrics.messagesSent, 0) }, deadLetterQueues: this.deadLetterQueues.size, delayedMessages: this.delayedMessages.size }; } destroy(): void { if (this.processingInterval) { clearInterval(this.processingInterval); } if (this.metricsInterval) { clearInterval(this.metricsInterval); } if (this.cleanupInterval) { clearInterval(this.cleanupInterval); } // Clear delayed message timers for (const timeout of this.delayedMessages.values()) { clearTimeout(timeout); } this.queues.clear(); this.messages.clear(); this.consumers.clear(); this.producers.clear(); this.subscriptions.clear(); this.processingMessages.clear(); this.delayedMessages.clear(); this.deadLetterQueues.clear(); this.removeAllListeners(); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Coder-RL/Claude_MCPServer_Dev1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server