workers.service.ts•12.4 kB
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import {
WorkerManager,
RedisCacheService,
JobsMetadataService,
} from '@snakagent/workers';
import { logger } from '@snakagent/core';
import { ConfigurationService } from '../../config/configuration.js';
import {
JobNotFoundError,
JobNotCompletedError,
JobFailedError,
JobAccessDeniedError,
UnknownJobStatusError,
} from '../../common/errors/job.errors.js';
@Injectable()
export class WorkersService implements OnModuleInit, OnModuleDestroy {
private workerManager: WorkerManager;
private cacheService: RedisCacheService;
constructor(
private readonly config: ConfigurationService,
private readonly jobsMetadataService: JobsMetadataService
) {
this.cacheService = new RedisCacheService(this.config.redis);
this.workerManager = new WorkerManager(
this.config.redis,
this.cacheService
);
}
async onModuleInit() {
try {
logger.info('Initializing workers service...');
await this.workerManager.start();
// Cache cleanup handled automatically
logger.info('Workers service initialized successfully');
} catch (error) {
logger.error('Failed to initialize workers service:', error);
throw error;
}
}
async onModuleDestroy() {
try {
logger.info('Shutting down workers service...');
await this.workerManager.stop();
// Cache cleanup handled automatically
logger.info('Workers service shutdown completed');
} catch (error) {
logger.error('Error during workers service shutdown:', error);
}
}
/**
* Process a file asynchronously using the file ingestion queue
*/
async processFileAsync(
agentId: string,
userId: string,
fileId: string,
originalName: string,
mimeType: string,
buffer: Buffer,
size: number
): Promise<string> {
const fileIngestionQueue = this.workerManager
.getJobProcessor()
.getFileIngestionQueue();
const job = await fileIngestionQueue.addFileIngestionJob({
agentId,
userId,
fileId,
originalName,
mimeType,
buffer,
size,
});
const jobId = job.id?.toString();
if (!jobId) {
throw new JobNotFoundError('unknown', {
message: 'Failed to get job ID from queue',
context: 'processFileAsync',
});
}
logger.info(`File ingestion job added to queue with ID: ${jobId}`);
try {
await this.jobsMetadataService.createJobMetadata({
type: 'file-ingestion' as any,
agentId,
userId,
payload: { jobId, agentId, userId },
});
logger.debug(`Created job metadata for job ${jobId}`);
} catch (error) {
logger.error(`Failed to create job metadata for ${jobId}:`, error);
// Don't let cache failures break the main operation
}
logger.info(`File processing queued with job ID: ${jobId}`);
return jobId;
}
/**
* Get job status by ID
*/
async getJobStatus(jobId: string): Promise<{
id: string;
status: string;
error?: string;
createdAt?: Date;
processedOn?: Date;
finishedOn?: Date;
} | null> {
const fileIngestionQueue = this.workerManager
.getJobProcessor()
.getFileIngestionQueue();
const job = await fileIngestionQueue.getQueue().getJob(jobId);
if (!job) {
return null;
}
return {
id: job.id?.toString() || '',
status: await job.getState(),
error: job.failedReason,
createdAt: new Date(job.timestamp),
processedOn: job.processedOn ? new Date(job.processedOn) : undefined,
finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined,
};
}
/**
* Get job status by ID with user validation
*/
async getJobStatusForUser(
jobId: string,
userId: string
): Promise<{
id: string;
status: string;
error?: string;
createdAt?: Date;
processedOn?: Date;
finishedOn?: Date;
} | null> {
logger.info(`Getting job status for ${jobId} (user: ${userId})`);
const cachedStatus = await this.cacheService.getJobRetrievalResult(jobId);
if (
cachedStatus &&
(cachedStatus.status === 'completed' || cachedStatus.status === 'failed')
) {
logger.debug(
`Cache hit for completed/failed job status ${jobId} (user: ${userId})`
);
return {
id: cachedStatus.jobId,
status: cachedStatus.status,
error: cachedStatus.error,
createdAt: cachedStatus.createdAt,
finishedOn: cachedStatus.completedAt,
};
} else if (cachedStatus) {
logger.debug(
`Cache hit for active job status ${jobId} (user: ${userId}), but checking for updates...`
);
}
try {
const jobMetadata = await this.jobsMetadataService.getJobMetadataForUser(
jobId,
userId
);
if (jobMetadata) {
if (jobMetadata.userId !== userId) {
logger.error(
`Job metadata ownership mismatch for job ${jobId}: metadata.userId=${jobMetadata.userId}, requested.userId=${userId}`
);
throw new JobAccessDeniedError(jobId, userId);
} else {
const statusString = String(jobMetadata.status);
const isCompleted = statusString === 'completed';
const isFailed = statusString === 'failed';
if (isCompleted || isFailed) {
const status = {
id: jobMetadata.jobId,
status: statusString,
error: jobMetadata.error,
createdAt: jobMetadata.createdAt,
processedOn: jobMetadata.startedAt,
finishedOn: jobMetadata.completedAt,
};
await this.cacheService.setJobRetrievalResult(jobId, {
jobId,
agentId: jobMetadata.agentId || '',
userId: jobMetadata.userId,
status: status.status as any,
data: null,
error: status.error,
createdAt: status.createdAt,
completedAt: status.finishedOn,
source: 'database' as any,
});
logger.debug(
`Retrieved completed job status from metadata for ${jobId} (user: ${userId})`
);
return {
id: jobId,
status: status.status,
error: status.error,
createdAt: status.createdAt,
processedOn: status.processedOn,
finishedOn: status.finishedOn,
};
} else {
logger.debug(
`Job metadata found but status is active (${jobMetadata.status}), checking Bull queue for ${jobId}`
);
}
}
}
} catch (error) {
logger.error(`Failed to get job metadata for ${jobId}:`, error);
}
const fileIngestionQueue = this.workerManager
.getJobProcessor()
.getFileIngestionQueue();
const job = await fileIngestionQueue.getQueue().getJob(jobId);
if (!job) {
logger.warn(`Job ${jobId} not found in queue`);
return null;
}
if (job.data.userId !== userId) {
logger.error(
`Job ownership mismatch for job ${jobId}: job.userId=${job.data.userId}, requested.userId=${userId}`
);
throw new JobAccessDeniedError(jobId, userId);
}
const jobState = await job.getState();
logger.info(`Job ${jobId} state from Bull queue: ${jobState}`);
const status = {
id: job.id?.toString() || '',
status: jobState,
error: job.failedReason,
createdAt: new Date(job.timestamp),
processedOn: job.processedOn ? new Date(job.processedOn) : undefined,
finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined,
};
const ttlMs =
status.status === 'completed' || status.status === 'failed'
? undefined
: 30000;
await this.cacheService.setJobRetrievalResult(
jobId,
{
jobId,
agentId: job.data.agentId || '',
userId: userId,
status: status.status as any,
data: null,
error: status.error,
createdAt: status.createdAt,
completedAt: status.finishedOn,
source: 'bull' as any,
},
ttlMs
);
return status;
}
/**
* Get job status directly from Bull queue (bypassing cache and metadata)
* This is useful when cache contains stale data
*/
async getJobStatusFromBull(
jobId: string,
userId: string
): Promise<{
id: string;
status: string;
error?: string;
createdAt?: Date;
processedOn?: Date;
finishedOn?: Date;
} | null> {
logger.info(
`Getting job status directly from Bull for ${jobId} (user: ${userId})`
);
const fileIngestionQueue = this.workerManager
.getJobProcessor()
.getFileIngestionQueue();
const job = await fileIngestionQueue.getQueue().getJob(jobId);
if (!job) {
logger.warn(`Job ${jobId} not found in Bull queue`);
return null;
}
if (job.data.userId !== userId) {
logger.error(
`Job ownership mismatch for job ${jobId}: job.userId=${job.data.userId}, requested.userId=${userId}`
);
throw new JobAccessDeniedError(jobId, userId);
}
const jobState = await job.getState();
logger.info(`Job ${jobId} state from Bull queue: ${jobState}`);
const status = {
id: job.id?.toString() || '',
status: jobState,
error: job.failedReason,
createdAt: new Date(job.timestamp),
processedOn: job.processedOn ? new Date(job.processedOn) : undefined,
finishedOn: job.finishedOn ? new Date(job.finishedOn) : undefined,
};
await this.cacheService.setJobRetrievalResult(jobId, {
jobId,
agentId: job.data.agentId || '',
userId: userId,
status: status.status as any,
data: null,
error: status.error,
createdAt: status.createdAt,
completedAt: status.finishedOn,
source: 'bull' as any,
});
return status;
}
/**
* Get job result by ID
*/
async getJobResult(jobId: string): Promise<any> {
const fileIngestionQueue = this.workerManager
.getJobProcessor()
.getFileIngestionQueue();
const job = await fileIngestionQueue.getQueue().getJob(jobId);
if (!job) {
throw new JobNotFoundError(jobId);
}
const state = await job.getState();
if (state === 'failed') {
throw new JobFailedError(jobId, job.failedReason);
}
if (state !== 'completed') {
throw new JobNotCompletedError(jobId, state);
}
return job.returnvalue;
}
/**
* Get job result by ID with user validation
*/
async getJobResultForUser(jobId: string, userId: string): Promise<any> {
try {
// Get job result from metadata service
const jobMetadata = await this.jobsMetadataService.getJobMetadataForUser(
jobId,
userId
);
if (!jobMetadata) {
throw new JobNotFoundError(jobId);
}
const result = {
status:
jobMetadata.status === 'completed'
? 'completed'
: jobMetadata.status === 'failed'
? 'failed'
: 'processing',
data: jobMetadata.result,
error: jobMetadata.error,
};
switch (result.status) {
case 'completed':
return result.data;
case 'processing':
throw new JobNotCompletedError(jobId, result.status);
case 'failed':
throw new JobFailedError(jobId, result.error);
default:
throw new UnknownJobStatusError(jobId, result.status);
}
} catch (error) {
logger.error(`Failed to get job result for ${jobId}:`, error);
throw error;
}
}
/**
* Get metrics for all queues
*/
async getQueueMetrics(): Promise<any> {
return await this.workerManager.getMetrics();
}
/**
* Check if workers are active
*/
isActive(): boolean {
return this.workerManager.isActive();
}
/**
* Get the underlying worker manager instance
*/
getWorkerManager(): WorkerManager {
return this.workerManager;
}
/**
* Get diagnostic information about the workers
*/
async getDiagnostics(): Promise<any> {
return await this.workerManager.getDiagnostics();
}
}