Skip to main content
Glama
embedding-worker.ts•9.65 kB
import { type Job, Worker } from 'bullmq'; import Redis from 'ioredis'; import { sql } from 'kysely'; import { config } from '../config/index.js'; import { db } from '../database/index.js'; import { EmbeddingService } from '../services/embedding-service.js'; import type { EmbeddingJob } from '../services/queue-service.js'; import { extractTextForEmbedding } from '../utils/text-extraction.js'; export class EmbeddingWorker { private worker: Worker<EmbeddingJob>; private embeddingService: EmbeddingService; private connection: Redis | null = null; constructor(connection?: Redis) { this.embeddingService = new EmbeddingService(); // Use provided connection or create new one this.connection = connection || new Redis(config.REDIS_URL || 'redis://localhost:6379', { maxRetriesPerRequest: null, enableReadyCheck: false, }); // Initialize worker if (!this.connection) { throw new Error('Redis connection is required for embedding worker'); } this.worker = new Worker<EmbeddingJob>('embedding-generation', this.processEmbedding.bind(this), { connection: this.connection, concurrency: config.WORKER_CONCURRENCY || 3, // Process 3 jobs concurrently limiter: { max: 10, // Max 10 jobs duration: 1000, // per second }, autorun: true, }); this.setupEventHandlers(); } private async processEmbedding(job: Job<EmbeddingJob>): Promise<void> { const { memoryId } = job.data; const startTime = Date.now(); try { // Log job start await job.log(`Starting embedding generation for memory ${memoryId}`); await job.updateProgress(10); // Check if embedding already exists and get memory details const existingMemory = await db .selectFrom('memories') .select(['id', 'embedding', 'content', 'tags', 'type', 'source']) .where('id', '=', memoryId) .executeTakeFirst(); if (existingMemory?.embedding) { await job.log('Embedding already exists, skipping generation'); return; } if (!existingMemory) { throw new Error(`Memory ${memoryId} not found`); } await job.updateProgress(30); // Generate embedding from extracted text for better semantic search await job.log('Extracting text and generating embedding...'); // Parse the content if it's stored as JSON string let parsedContent: unknown; try { parsedContent = typeof existingMemory.content === 'string' ? JSON.parse(existingMemory.content) : existingMemory.content; } catch { parsedContent = existingMemory.content; } const textForEmbedding = extractTextForEmbedding( parsedContent, existingMemory.tags || undefined, existingMemory.type ); const embedding = await this.embeddingService.generateEmbedding(textForEmbedding); await job.updateProgress(70); // Store embedding in database with dimension (as PostgreSQL vector format) await job.log('Storing embedding in database...'); const embeddingString = `[${embedding.join(',')}]`; await db .updateTable('memories') .set({ embedding: embeddingString, // Store as PostgreSQL vector format embedding_dimension: embedding.length, updated_at: new Date(), }) .where('id', '=', memoryId) .execute(); await job.updateProgress(90); // Cache the embedding if Redis is available if (this.connection && config.REDIS_URL) { const cacheKey = `embedding:${memoryId}`; await this.connection.setex(cacheKey, config.LONG_CACHE_TTL, JSON.stringify(embedding)); } await job.updateProgress(100); const duration = Date.now() - startTime; await job.log(`Embedding generated successfully in ${duration}ms`); // Update metrics await this.updateMetrics('success', duration); } catch (error) { const duration = Date.now() - startTime; await this.updateMetrics('failure', duration); // Log error details await job.log(`Error: ${(error as Error).message}`); // Check if this is a retryable error if (this.isRetryableError(error)) { throw error; // BullMQ will retry } else { // Non-retryable error, move to failed state await this.handleNonRetryableError(job, error); return; // Don't throw, job is considered "completed" but flagged } } } private setupEventHandlers(): void { this.worker.on('completed', (job) => { console.log(`[EmbeddingWorker] Job ${job.id} completed`); }); this.worker.on('failed', (job, error) => { console.error(`[EmbeddingWorker] Job ${job?.id} failed:`, error.message); }); this.worker.on('active', (job) => { console.log(`[EmbeddingWorker] Job ${job.id} started`); }); this.worker.on('stalled', (jobId) => { console.warn(`[EmbeddingWorker] Job ${jobId} stalled`); }); this.worker.on('error', (error) => { console.error('[EmbeddingWorker] Worker error:', error); }); // Graceful shutdown process.on('SIGTERM', () => this.shutdown()); process.on('SIGINT', () => this.shutdown()); } private isRetryableError(error: unknown): boolean { // Network errors, timeouts, rate limits are retryable const retryableMessages = [ 'ECONNREFUSED', 'ETIMEDOUT', 'ENOTFOUND', 'rate limit', 'too many requests', '429', '503', '504', ]; const errorMessage = (error as Error).message?.toLowerCase() || ''; return retryableMessages.some((msg) => errorMessage.includes(msg.toLowerCase())); } private async handleNonRetryableError(job: Job<EmbeddingJob>, error: unknown): Promise<void> { // Sanitize error message to prevent injection const rawMsg = String((error as Error).message || 'Unknown error').slice(0, 500); const sanitizedError = rawMsg .split('') .filter((ch) => { const code = ch.charCodeAt(0); return code >= 32 && code !== 127; // strip control chars }) .join('') .replace(/'/g, "''"); // Escape single quotes for SQL safety // Create safe error data object const errorData = JSON.stringify({ embeddingError: sanitizedError, timestamp: new Date().toISOString(), }); // Validate JSON before using in query try { JSON.parse(errorData); // Validate JSON structure } catch { console.error('[EmbeddingWorker] Invalid error data JSON'); return; } await db .updateTable('memories') .set({ metadata: sql`COALESCE(metadata, '{}'::jsonb) || ${errorData}::jsonb`, updated_at: new Date(), }) .where('id', '=', job.data.memoryId) .execute(); // Log to dead letter queue tracking with sanitized output console.error(`[EmbeddingWorker] Non-retryable error for job ${job.id}:`, { memoryId: job.data.memoryId, error: sanitizedError, // Omit stack trace in production to prevent information disclosure ...(process.env.NODE_ENV === 'development' ? { stack: (error as Error).stack } : {}), }); } private async updateMetrics(status: 'success' | 'failure', duration: number): Promise<void> { if (!this.connection) return; const metricsKey = `metrics:embedding:${status}`; const countKey = `${metricsKey}:count`; const durationKey = `${metricsKey}:duration`; await this.connection.incr(countKey); await this.connection.lpush(durationKey, duration); await this.connection.ltrim(durationKey, 0, 99); // Keep last 100 durations } async getMetrics() { if (!this.connection) { return { success: { count: 0, avgDuration: 0 }, failure: { count: 0, avgDuration: 0 }, successRate: 100 }; } const [successCount, failureCount, successDurations, failureDurations] = await Promise.all([ this.connection.get('metrics:embedding:success:count'), this.connection.get('metrics:embedding:failure:count'), this.connection.lrange('metrics:embedding:success:duration', 0, -1), this.connection.lrange('metrics:embedding:failure:duration', 0, -1), ]); const calculateAverage = (durations: string[]) => { if (!durations.length) return 0; const sum = durations.reduce((acc, d) => acc + parseInt(d, 10), 0); return Math.round(sum / durations.length); }; return { success: { count: parseInt(successCount || '0', 10), avgDuration: calculateAverage(successDurations), }, failure: { count: parseInt(failureCount || '0', 10), avgDuration: calculateAverage(failureDurations), }, successRate: successCount && failureCount ? (parseInt(successCount, 10) / (parseInt(successCount, 10) + parseInt(failureCount, 10))) * 100 : 100, }; } async pause(): Promise<void> { await this.worker.pause(); console.log('[EmbeddingWorker] Worker paused'); } async resume(): Promise<void> { await this.worker.resume(); console.log('[EmbeddingWorker] Worker resumed'); } async shutdown(): Promise<void> { console.log('[EmbeddingWorker] Closing worker...'); await this.worker.close(); if (this.connection) { await this.connection.quit(); } console.log('[EmbeddingWorker] Worker closed'); } } // Start worker if run directly if (import.meta.url === `file://${process.argv[1]}`) { new EmbeddingWorker(); console.log('[EmbeddingWorker] Worker started'); }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scanadi/mcp-ai-memory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server