Skip to main content
Glama
job-processor.ts9.11 kB
import type { Job } from 'bull'; import { QueueManager, FileIngestionQueue } from '../queues/index.js'; import { FileIngestionProcessor } from './file-ingestion-processor.js'; import { logger } from '@snakagent/core'; import { FileIngestionResult, JobStatus, ResultSource, ResultStatus, } from '../types/index.js'; import { RedisCacheService } from '../services/cache/redis-cache.service.js'; import { JobsMetadataService } from '../services/jobs/jobs-metadata.service.js'; export class JobProcessor { private readonly queueManager: QueueManager; private fileIngestionQueue: FileIngestionQueue | null = null; private readonly fileIngestionProcessor: FileIngestionProcessor; private readonly cacheService: RedisCacheService; private isProcessingStarted: boolean = false; private isFileIngestionProcessorRegistered: boolean = false; constructor( queueManager: QueueManager, fileIngestionProcessor: FileIngestionProcessor, cacheService: RedisCacheService, private readonly jobsMetadataService: JobsMetadataService ) { this.queueManager = queueManager; this.fileIngestionProcessor = fileIngestionProcessor; this.cacheService = cacheService; } async initialize(): Promise<void> { if (this.fileIngestionQueue) { logger.warn('JobProcessor already initialized'); return; } this.fileIngestionQueue = new FileIngestionQueue(this.queueManager); } async startProcessing(): Promise<void> { if (this.isProcessingStarted) { logger.info('Job processing is already started'); return; } const config = this.queueManager.getConfig(); await this.startFileIngestionProcessing(config.concurrency.fileIngestion); this.isProcessingStarted = true; logger.info('All job processors started'); } private async startFileIngestionProcessing( concurrency: number ): Promise<void> { if (!this.fileIngestionQueue) { throw new Error('FileIngestionQueue not initialized'); } const queue = this.fileIngestionQueue.getQueue(); if (this.isFileIngestionProcessorRegistered) { logger.info( 'File ingestion processor already registered, ensuring queue is active' ); // Ensure the queue is not paused if (await queue.isPaused()) { await queue.resume(); logger.info('File ingestion queue resumed'); } return; } // Remove only our listeners to avoid duplicates queue.removeListener('error', this.handleQueueError); queue.removeListener('failed', this.handleJobFailed); queue.removeListener('stalled', this.handleJobStalled); queue.removeListener('active', this.handleJobActive); queue.removeListener('completed', this.handleJobCompleted); queue.removeListener('waiting', this.handleJobWaiting); queue.on('error', this.handleQueueError); queue.on('failed', this.handleJobFailed); queue.on('stalled', this.handleJobStalled); queue.on('active', this.handleJobActive); queue.on('completed', this.handleJobCompleted); queue.on('waiting', this.handleJobWaiting); // Ensure the queue is not paused before processing if (await queue.isPaused()) { await queue.resume(); logger.info('File ingestion queue resumed'); } try { queue.process( 'file-ingestion', concurrency, this.handleFileIngestionJob.bind(this) ); this.isFileIngestionProcessorRegistered = true; logger.info( `File ingestion processor started with concurrency: ${concurrency}` ); } catch (error) { logger.error('Failed to register file ingestion processor:', error); throw error; } } private async handleFileIngestionJob(job: Job): Promise<FileIngestionResult> { try { const result = await this.processFileIngestion(job); logger.info(`File ingestion job ${job.id} completed successfully`); try { await this.cacheService.setJobRetrievalResult(job.id.toString(), { jobId: job.id.toString(), agentId: job.data.agentId, userId: job.data.userId, status: ResultStatus.COMPLETED, data: result, error: undefined, createdAt: new Date(job.timestamp), completedAt: new Date(), source: ResultSource.BULL, }); await this.jobsMetadataService.updateJobMetadata(job.id.toString(), { status: JobStatus.COMPLETED, completedAt: new Date(), result: result, }); logger.info(`Updated cache and metadata for completed job ${job.id}`); } catch (cacheError) { logger.error( `Failed to update cache/metadata for completed job ${job.id}:`, cacheError ); // Don't throw here as the job itself succeeded } return result; } catch (error) { logger.error(`File ingestion job ${job.id} failed:`, error); try { await this.cacheService.setJobRetrievalResult(job.id.toString(), { jobId: job.id.toString(), agentId: job.data.agentId, userId: job.data.userId, status: ResultStatus.FAILED, data: null, error: error instanceof Error ? error.message : 'Unknown error', createdAt: new Date(job.timestamp), completedAt: new Date(), source: ResultSource.BULL, }); await this.jobsMetadataService.updateJobMetadata(job.id.toString(), { status: JobStatus.FAILED, // 'failed' is valid in database completedAt: new Date(), error: error instanceof Error ? error.message : 'Unknown error', }); logger.info(`Updated cache and metadata for failed job ${job.id}`); } catch (cacheError) { logger.error( `Failed to update cache/metadata for failed job ${job.id}:`, cacheError ); // Don't throw here as we want to preserve the original error } throw error; } } private async processFileIngestion(job: Job): Promise<FileIngestionResult> { return await this.fileIngestionProcessor.process(job); } async stopProcessing(): Promise<void> { if (!this.isProcessingStarted) { logger.info('Job processing is not started'); return; } const config = this.queueManager.getConfig(); await this.queueManager.pauseQueue(config.queues.fileIngestion); if (this.fileIngestionQueue) { const queue = this.fileIngestionQueue.getQueue(); await queue.close(); this.fileIngestionQueue = null; } this.isProcessingStarted = false; this.isFileIngestionProcessorRegistered = false; logger.info('All job processors stopped'); } getFileIngestionQueue(): FileIngestionQueue { if (!this.fileIngestionQueue) { throw new Error('FileIngestionQueue not initialized'); } return this.fileIngestionQueue; } /** * Reset the processor state - useful for debugging or recovery */ reset(): void { this.isProcessingStarted = false; this.isFileIngestionProcessorRegistered = false; logger.info('Job processor state reset'); } /** * Force restart processing - useful after application restart */ async forceRestartProcessing(): Promise<void> { logger.info('Force restarting job processing...'); if (!this.fileIngestionQueue) { await this.initialize(); } this.isProcessingStarted = false; this.isFileIngestionProcessorRegistered = false; await this.startProcessing(); logger.info('Job processing force restarted'); } /** * Get diagnostic information about the processor state */ async getDiagnostics(): Promise<{ isProcessingStarted: boolean; isFileIngestionProcessorRegistered: boolean; queuePaused: boolean; queueName: string; }> { if (!this.fileIngestionQueue) { throw new Error('FileIngestionQueue not initialized'); } const queue = this.fileIngestionQueue.getQueue(); const queuePaused = await queue.isPaused(); return { isProcessingStarted: this.isProcessingStarted, isFileIngestionProcessorRegistered: this.isFileIngestionProcessorRegistered, queuePaused, queueName: queue.name, }; } // Event handler methods for proper listener management private readonly handleQueueError = (error: Error): void => { logger.error(`File ingestion queue error:`, error); }; private readonly handleJobFailed = (job: Job, err: Error): void => { logger.error(`File ingestion job ${job.id} failed:`, err); }; private readonly handleJobStalled = (job: Job): void => { logger.warn(`File ingestion job ${job.id} stalled`); }; private readonly handleJobActive = (job: Job): void => { logger.info(`File ingestion job ${job.id} is now active`); }; private readonly handleJobCompleted = (job: Job): void => { logger.info(`File ingestion job ${job.id} completed successfully`); }; private readonly handleJobWaiting = (jobId: string): void => { logger.info(`File ingestion job ${jobId} is waiting`); }; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KasarLabs/snak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server