Skip to main content
Glama
workers.service.ts12.4 kB
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { WorkerManager, RedisCacheService, JobsMetadataService, } from '@snakagent/workers'; import { logger } from '@snakagent/core'; import { ConfigurationService } from '../../config/configuration.js'; import { JobNotFoundError, JobNotCompletedError, JobFailedError, JobAccessDeniedError, UnknownJobStatusError, } from '../../common/errors/job.errors.js'; @Injectable() export class WorkersService implements OnModuleInit, OnModuleDestroy { private workerManager: WorkerManager; private cacheService: RedisCacheService; constructor( private readonly config: ConfigurationService, private readonly jobsMetadataService: JobsMetadataService ) { this.cacheService = new RedisCacheService(this.config.redis); this.workerManager = new WorkerManager( this.config.redis, this.cacheService ); } async onModuleInit() { try { logger.info('Initializing workers service...'); await this.workerManager.start(); // Cache cleanup handled automatically logger.info('Workers service initialized successfully'); } catch (error) { logger.error('Failed to initialize workers service:', error); throw error; } } async onModuleDestroy() { try { logger.info('Shutting down workers service...'); await this.workerManager.stop(); // Cache cleanup handled automatically logger.info('Workers service shutdown completed'); } catch (error) { logger.error('Error during workers service shutdown:', error); } } /** * Process a file asynchronously using the file ingestion queue */ async processFileAsync( agentId: string, userId: string, fileId: string, originalName: string, mimeType: string, buffer: Buffer, size: number ): Promise<string> { const fileIngestionQueue = this.workerManager .getJobProcessor() .getFileIngestionQueue(); const job = await fileIngestionQueue.addFileIngestionJob({ agentId, userId, fileId, originalName, mimeType, buffer, size, }); const jobId = job.id?.toString(); if (!jobId) { throw new JobNotFoundError('unknown', { message: 'Failed to get job ID from queue', context: 'processFileAsync', }); } logger.info(`File ingestion job added to queue with ID: ${jobId}`); try { await this.jobsMetadataService.createJobMetadata({ type: 'file-ingestion' as any, agentId, userId, payload: { jobId, agentId, userId }, }); logger.debug(`Created job metadata for job ${jobId}`); } catch (error) { logger.error(`Failed to create job metadata for ${jobId}:`, error); // Don't let cache failures break the main operation } logger.info(`File processing queued with job ID: ${jobId}`); return jobId; } /** * Get job status by ID */ async getJobStatus(jobId: string): Promise<{ id: string; status: string; error?: string; createdAt?: Date; processedOn?: Date; finishedOn?: Date; } | null> { const fileIngestionQueue = this.workerManager .getJobProcessor() .getFileIngestionQueue(); const job = await fileIngestionQueue.getQueue().getJob(jobId); if (!job) { return null; } return { id: job.id?.toString() || '', status: await job.getState(), error: job.failedReason, createdAt: new Date(job.timestamp), processedOn: job.processedOn ? new Date(job.processedOn) : undefined, finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined, }; } /** * Get job status by ID with user validation */ async getJobStatusForUser( jobId: string, userId: string ): Promise<{ id: string; status: string; error?: string; createdAt?: Date; processedOn?: Date; finishedOn?: Date; } | null> { logger.info(`Getting job status for ${jobId} (user: ${userId})`); const cachedStatus = await this.cacheService.getJobRetrievalResult(jobId); if ( cachedStatus && (cachedStatus.status === 'completed' || cachedStatus.status === 'failed') ) { logger.debug( `Cache hit for completed/failed job status ${jobId} (user: ${userId})` ); return { id: cachedStatus.jobId, status: cachedStatus.status, error: cachedStatus.error, createdAt: cachedStatus.createdAt, finishedOn: cachedStatus.completedAt, }; } else if (cachedStatus) { logger.debug( `Cache hit for active job status ${jobId} (user: ${userId}), but checking for updates...` ); } try { const jobMetadata = await this.jobsMetadataService.getJobMetadataForUser( jobId, userId ); if (jobMetadata) { if (jobMetadata.userId !== userId) { logger.error( `Job metadata ownership mismatch for job ${jobId}: metadata.userId=${jobMetadata.userId}, requested.userId=${userId}` ); throw new JobAccessDeniedError(jobId, userId); } else { const statusString = String(jobMetadata.status); const isCompleted = statusString === 'completed'; const isFailed = statusString === 'failed'; if (isCompleted || isFailed) { const status = { id: jobMetadata.jobId, status: statusString, error: jobMetadata.error, createdAt: jobMetadata.createdAt, processedOn: jobMetadata.startedAt, finishedOn: jobMetadata.completedAt, }; await this.cacheService.setJobRetrievalResult(jobId, { jobId, agentId: jobMetadata.agentId || '', userId: jobMetadata.userId, status: status.status as any, data: null, error: status.error, createdAt: status.createdAt, completedAt: status.finishedOn, source: 'database' as any, }); logger.debug( `Retrieved completed job status from metadata for ${jobId} (user: ${userId})` ); return { id: jobId, status: status.status, error: status.error, createdAt: status.createdAt, processedOn: status.processedOn, finishedOn: status.finishedOn, }; } else { logger.debug( `Job metadata found but status is active (${jobMetadata.status}), checking Bull queue for ${jobId}` ); } } } } catch (error) { logger.error(`Failed to get job metadata for ${jobId}:`, error); } const fileIngestionQueue = this.workerManager .getJobProcessor() .getFileIngestionQueue(); const job = await fileIngestionQueue.getQueue().getJob(jobId); if (!job) { logger.warn(`Job ${jobId} not found in queue`); return null; } if (job.data.userId !== userId) { logger.error( `Job ownership mismatch for job ${jobId}: job.userId=${job.data.userId}, requested.userId=${userId}` ); throw new JobAccessDeniedError(jobId, userId); } const jobState = await job.getState(); logger.info(`Job ${jobId} state from Bull queue: ${jobState}`); const status = { id: job.id?.toString() || '', status: jobState, error: job.failedReason, createdAt: new Date(job.timestamp), processedOn: job.processedOn ? new Date(job.processedOn) : undefined, finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined, }; const ttlMs = status.status === 'completed' || status.status === 'failed' ? undefined : 30000; await this.cacheService.setJobRetrievalResult( jobId, { jobId, agentId: job.data.agentId || '', userId: userId, status: status.status as any, data: null, error: status.error, createdAt: status.createdAt, completedAt: status.finishedOn, source: 'bull' as any, }, ttlMs ); return status; } /** * Get job status directly from Bull queue (bypassing cache and metadata) * This is useful when cache contains stale data */ async getJobStatusFromBull( jobId: string, userId: string ): Promise<{ id: string; status: string; error?: string; createdAt?: Date; processedOn?: Date; finishedOn?: Date; } | null> { logger.info( `Getting job status directly from Bull for ${jobId} (user: ${userId})` ); const fileIngestionQueue = this.workerManager .getJobProcessor() .getFileIngestionQueue(); const job = await fileIngestionQueue.getQueue().getJob(jobId); if (!job) { logger.warn(`Job ${jobId} not found in Bull queue`); return null; } if (job.data.userId !== userId) { logger.error( `Job ownership mismatch for job ${jobId}: job.userId=${job.data.userId}, requested.userId=${userId}` ); throw new JobAccessDeniedError(jobId, userId); } const jobState = await job.getState(); logger.info(`Job ${jobId} state from Bull queue: ${jobState}`); const status = { id: job.id?.toString() || '', status: jobState, error: job.failedReason, createdAt: new Date(job.timestamp), processedOn: job.processedOn ? new Date(job.processedOn) : undefined, finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined, }; await this.cacheService.setJobRetrievalResult(jobId, { jobId, agentId: job.data.agentId || '', userId: userId, status: status.status as any, data: null, error: status.error, createdAt: status.createdAt, completedAt: status.finishedOn, source: 'bull' as any, }); return status; } /** * Get job result by ID */ async getJobResult(jobId: string): Promise<any> { const fileIngestionQueue = this.workerManager .getJobProcessor() .getFileIngestionQueue(); const job = await fileIngestionQueue.getQueue().getJob(jobId); if (!job) { throw new JobNotFoundError(jobId); } const state = await job.getState(); if (state === 'failed') { throw new JobFailedError(jobId, job.failedReason); } if (state !== 'completed') { throw new JobNotCompletedError(jobId, state); } return job.returnvalue; } /** * Get job result by ID with user validation */ async getJobResultForUser(jobId: string, userId: string): Promise<any> { try { // Get job result from metadata service const jobMetadata = await this.jobsMetadataService.getJobMetadataForUser( jobId, userId ); if (!jobMetadata) { throw new JobNotFoundError(jobId); } const result = { status: jobMetadata.status === 'completed' ? 'completed' : jobMetadata.status === 'failed' ? 'failed' : 'processing', data: jobMetadata.result, error: jobMetadata.error, }; switch (result.status) { case 'completed': return result.data; case 'processing': throw new JobNotCompletedError(jobId, result.status); case 'failed': throw new JobFailedError(jobId, result.error); default: throw new UnknownJobStatusError(jobId, result.status); } } catch (error) { logger.error(`Failed to get job result for ${jobId}:`, error); throw error; } } /** * Get metrics for all queues */ async getQueueMetrics(): Promise<any> { return await this.workerManager.getMetrics(); } /** * Check if workers are active */ isActive(): boolean { return this.workerManager.isActive(); } /** * Get the underlying worker manager instance */ getWorkerManager(): WorkerManager { return this.workerManager; } /** * Get diagnostic information about the workers */ async getDiagnostics(): Promise<any> { return await this.workerManager.getDiagnostics(); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KasarLabs/snak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server