Skip to main content
Glama
clustering-worker.ts•8.5 kB
import { type Job, Worker } from 'bullmq'; import Redis from 'ioredis'; import { config } from '../config/index.js'; import { createDatabase } from '../database/client.js'; import { ClusteringService } from '../services/clustering-service.js'; import type { QueueService } from '../services/queue-service.js'; export interface ClusteringJob { type: 'full-clustering' | 'incremental' | 'merge-clusters' | 'split-clusters'; filters?: { type?: string; userId?: string; startDate?: Date; endDate?: Date; minImportance?: number; }; memoryIds?: string[]; config?: { epsilon?: number; minPoints?: number; minClusterSize?: number; }; } export class ClusteringWorker { private worker: Worker<ClusteringJob>; private clusteringService: ClusteringService; private connection: Redis | null = null; constructor(connection?: Redis, queueService?: QueueService) { const db = createDatabase(config.MEMORY_DB_URL); this.clusteringService = new ClusteringService(db, queueService); // Use provided connection or create new one this.connection = connection || new Redis(config.REDIS_URL || 'redis://localhost:6379', { maxRetriesPerRequest: null, enableReadyCheck: false, }); // Initialize worker if (!this.connection) { throw new Error('Redis connection is required for clustering worker'); } this.worker = new Worker<ClusteringJob>('clustering', this.processClusteringJob.bind(this), { connection: this.connection, concurrency: 1, // Single concurrency to avoid conflicts autorun: true, }); this.setupEventHandlers(); } private async processClusteringJob(job: Job<ClusteringJob>): Promise<Record<string, unknown>> { const { type, filters, memoryIds, config: clusterConfig } = job.data; const startTime = Date.now(); try { await job.log(`Starting ${type} clustering job`); await job.updateProgress(10); let result: Record<string, unknown>; switch (type) { case 'full-clustering': result = await this.performFullClustering(job, filters, clusterConfig); break; case 'incremental': if (!memoryIds || memoryIds.length === 0) { throw new Error('Memory IDs required for incremental clustering'); } result = await this.performIncrementalClustering(job, memoryIds, clusterConfig); break; case 'merge-clusters': result = await this.performClusterMerging(job); break; case 'split-clusters': result = await this.performClusterSplitting(job); break; default: throw new Error(`Unknown clustering job type: ${type}`); } const duration = Date.now() - startTime; await job.log(`Clustering job completed in ${duration}ms`); await job.updateProgress(100); return { ...result, jobType: type, processingTimeMs: duration, }; } catch (error) { await job.log(`Clustering job failed: ${(error as Error).message}`); throw error; } } private async performFullClustering( job: Job, filters?: Record<string, unknown>, clusterConfig?: Record<string, unknown> ): Promise<Record<string, unknown>> { await job.log('Performing full DBSCAN clustering on database'); await job.updateProgress(20); const stats = await this.clusteringService.clusterMemories(filters, clusterConfig); await job.updateProgress(60); // Post-processing: merge and split as needed const mergedCount = await this.clusteringService.mergeSimilarClusters(0.85); await job.log(`Merged ${mergedCount} similar clusters`); await job.updateProgress(80); const splitCount = await this.clusteringService.splitLargeClusters(100, 0.4); await job.log(`Split ${splitCount} large incoherent clusters`); await job.updateProgress(90); // Get final cluster summary const clustersSummary = await this.clusteringService.getClustersSummary(); return { ...stats, mergedClusters: mergedCount, splitClusters: splitCount, finalClusterCount: clustersSummary.length, clustersSummary: clustersSummary.slice(0, 10), // Top 10 clusters }; } private async performIncrementalClustering( job: Job, memoryIds: string[], clusterConfig?: Record<string, unknown> ): Promise<Record<string, unknown>> { await job.log(`Performing incremental clustering for ${memoryIds.length} memories`); await job.updateProgress(30); const stats = await this.clusteringService.clusterNewMemories(memoryIds, clusterConfig); await job.updateProgress(90); return { ...stats }; } private async performClusterMerging(job: Job): Promise<Record<string, unknown>> { await job.log('Merging similar clusters'); await job.updateProgress(30); const beforeSummary = await this.clusteringService.getClustersSummary(); const initialCount = beforeSummary.length; await job.updateProgress(50); const mergedCount = await this.clusteringService.mergeSimilarClusters(0.8); await job.updateProgress(80); const afterSummary = await this.clusteringService.getClustersSummary(); const finalCount = afterSummary.length; await job.log(`Merged ${mergedCount} cluster pairs, reduced from ${initialCount} to ${finalCount} clusters`); return { initialClusterCount: initialCount, finalClusterCount: finalCount, mergedPairs: mergedCount, reduction: initialCount - finalCount, }; } private async performClusterSplitting(job: Job): Promise<Record<string, unknown>> { await job.log('Splitting large incoherent clusters'); await job.updateProgress(30); const beforeSummary = await this.clusteringService.getClustersSummary(); const initialCount = beforeSummary.length; await job.updateProgress(50); const splitCount = await this.clusteringService.splitLargeClusters(50, 0.4); await job.updateProgress(80); const afterSummary = await this.clusteringService.getClustersSummary(); const finalCount = afterSummary.length; await job.log(`Split ${splitCount} clusters, increased from ${initialCount} to ${finalCount} clusters`); return { initialClusterCount: initialCount, finalClusterCount: finalCount, splitClusters: splitCount, increase: finalCount - initialCount, }; } private setupEventHandlers(): void { this.worker.on('completed', (job) => { console.log(`[ClusteringWorker] Job ${job.id} completed`); }); this.worker.on('failed', (job, error) => { console.error(`[ClusteringWorker] Job ${job?.id} failed:`, error.message); }); this.worker.on('active', (job) => { console.log(`[ClusteringWorker] Job ${job.id} started`); }); this.worker.on('stalled', (jobId) => { console.warn(`[ClusteringWorker] Job ${jobId} stalled`); }); this.worker.on('error', (error) => { console.error('[ClusteringWorker] Worker error:', error); }); // Graceful shutdown process.on('SIGTERM', () => this.shutdown()); process.on('SIGINT', () => this.shutdown()); } async getMetrics() { // Get clustering-specific metrics const clustersSummary = await this.clusteringService.getClustersSummary(); return { totalClusters: clustersSummary.length, totalClusteredMemories: clustersSummary.reduce((sum, c) => sum + c.size, 0), largestCluster: Math.max(...clustersSummary.map((c) => c.size), 0), averageClusterSize: clustersSummary.length > 0 ? clustersSummary.reduce((sum, c) => sum + c.size, 0) / clustersSummary.length : 0, topTypes: [...new Set(clustersSummary.flatMap((c) => c.types))].slice(0, 5), }; } async pause(): Promise<void> { await this.worker.pause(); console.log('[ClusteringWorker] Worker paused'); } async resume(): Promise<void> { await this.worker.resume(); console.log('[ClusteringWorker] Worker resumed'); } async shutdown(): Promise<void> { console.log('[ClusteringWorker] Closing worker...'); await this.worker.close(); if (this.connection) { await this.connection.quit(); } console.log('[ClusteringWorker] Worker closed'); } } // Start worker if run directly if (import.meta.url === `file://${process.argv[1]}`) { new ClusteringWorker(); console.log('[ClusteringWorker] Worker started'); }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scanadi/mcp-ai-memory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server