Skip to main content
Glama

docs-mcp-server

PipelineManager.ts25.1 kB
/** * PipelineManager orchestrates a queue of scraping/indexing jobs. * - Controls concurrency, recovery, and job lifecycle * - Bridges in-memory job state with the persistent store * - Delegates execution to PipelineWorker and emits callbacks * Note: completionPromise has an attached no-op catch to avoid unhandled * promise rejection warnings when a job fails before a consumer awaits it. */ import { v4 as uuidv4 } from "uuid"; import { ScraperRegistry, ScraperService } from "../scraper"; import type { ScraperOptions, ScraperProgress } from "../scraper/types"; import type { DocumentManagementService } from "../store"; import { VersionStatus } from "../store/types"; import { DEFAULT_MAX_CONCURRENCY } from "../utils/config"; import { logger } from "../utils/logger"; import { CancellationError, PipelineStateError } from "./errors"; import { PipelineWorker } from "./PipelineWorker"; // Import the worker import type { IPipeline } from "./trpc/interfaces"; import type { InternalPipelineJob, PipelineJob, PipelineManagerCallbacks } from "./types"; import { PipelineJobStatus } from "./types"; /** * Manages a queue of document processing jobs, controlling concurrency and tracking progress. */ export class PipelineManager implements IPipeline { private jobMap: Map<string, InternalPipelineJob> = new Map(); private jobQueue: string[] = []; private activeWorkers: Set<string> = new Set(); private isRunning = false; private concurrency: number; private callbacks: PipelineManagerCallbacks = {}; private composedCallbacks: PipelineManagerCallbacks = {}; private store: DocumentManagementService; private scraperService: ScraperService; private shouldRecoverJobs: boolean; constructor( store: DocumentManagementService, concurrency: number = DEFAULT_MAX_CONCURRENCY, options: { recoverJobs?: boolean } = {}, ) { this.store = store; this.concurrency = concurrency; this.shouldRecoverJobs = options.recoverJobs ?? true; // Default to true for backward compatibility // ScraperService needs a registry. We create one internally for the manager. const registry = new ScraperRegistry(); this.scraperService = new ScraperService(registry); // Initialize composed callbacks to ensure progress persistence even before setCallbacks is called this.rebuildComposedCallbacks(); } /** * Registers callback handlers for pipeline manager events. */ setCallbacks(callbacks: PipelineManagerCallbacks): void { this.callbacks = callbacks || {}; this.rebuildComposedCallbacks(); } /** Build composed callbacks that ensure persistence then delegate to user callbacks */ private rebuildComposedCallbacks(): void { const user = this.callbacks; this.composedCallbacks = { onJobProgress: async (job, progress) => { await this.updateJobProgress(job, progress); await user.onJobProgress?.(job, progress); }, onJobStatusChange: async (job) => { await user.onJobStatusChange?.(job); }, onJobError: async (job, error, document) => { await user.onJobError?.(job, error, document); }, }; } /** * Converts internal job representation to public job interface. */ private toPublicJob(job: InternalPipelineJob): PipelineJob { return { id: job.id, library: job.library, version: job.version || null, // Convert empty string to null for public API status: job.status, progress: job.progress, error: job.error ? { message: job.error.message } : null, createdAt: job.createdAt, startedAt: job.startedAt, finishedAt: job.finishedAt, versionId: job.versionId, versionStatus: job.versionStatus, progressPages: job.progressPages, progressMaxPages: job.progressMaxPages, errorMessage: job.errorMessage, updatedAt: job.updatedAt, sourceUrl: job.sourceUrl, scraperOptions: job.scraperOptions, }; } /** * Starts the pipeline manager's worker processing. */ async start(): Promise<void> { if (this.isRunning) { logger.warn("⚠️ PipelineManager is already running."); return; } this.isRunning = true; logger.debug( `PipelineManager started with concurrency ${this.concurrency}, recoverJobs: ${this.shouldRecoverJobs}.`, ); // Recover pending jobs from database on startup only if enabled if (this.shouldRecoverJobs) { await this.recoverPendingJobs(); } else { logger.debug("Job recovery disabled for this PipelineManager instance"); } this._processQueue().catch((error) => { logger.error(`❌ Error in processQueue during start: ${error}`); }); // Start processing any existing jobs } /** * Recovers pending jobs from the database after server restart. * Finds versions with RUNNING status and resets them to QUEUED for re-processing. * Also loads all QUEUED versions back into the pipeline queue. */ async recoverPendingJobs(): Promise<void> { try { // Reset RUNNING jobs to QUEUED (they were interrupted by server restart) const runningVersions = await this.store.getVersionsByStatus([ VersionStatus.RUNNING, ]); for (const version of runningVersions) { await this.store.updateVersionStatus(version.id, VersionStatus.QUEUED); logger.info( `🔄 Reset interrupted job to QUEUED: ${version.library_name}@${version.name || "unversioned"}`, ); } // Load all QUEUED versions back into pipeline const queuedVersions = await this.store.getVersionsByStatus([VersionStatus.QUEUED]); for (const version of queuedVersions) { // Create complete job with all database state restored const jobId = uuidv4(); const abortController = new AbortController(); let resolveCompletion!: () => void; let rejectCompletion!: (reason?: unknown) => void; const completionPromise = new Promise<void>((resolve, reject) => { resolveCompletion = resolve; rejectCompletion = reject; }); // Prevent unhandled rejection warnings if rejection occurs before consumers attach handlers completionPromise.catch(() => {}); // Parse stored scraper options let parsedScraperOptions = null; if (version.scraper_options) { try { parsedScraperOptions = JSON.parse(version.scraper_options); } catch (error) { logger.warn( `⚠️ Failed to parse scraper options for ${version.library_name}@${version.name || "unversioned"}: ${error}`, ); } } const job: InternalPipelineJob = { id: jobId, library: version.library_name, version: version.name || "", status: PipelineJobStatus.QUEUED, progress: null, error: null, createdAt: new Date(version.created_at), // For recovered QUEUED jobs, startedAt must be null to reflect queued state. startedAt: null, finishedAt: null, abortController, completionPromise, resolveCompletion, rejectCompletion, // Database fields (single source of truth) versionId: version.id, versionStatus: version.status, progressPages: version.progress_pages, progressMaxPages: version.progress_max_pages, errorMessage: version.error_message, updatedAt: new Date(version.updated_at), sourceUrl: version.source_url, scraperOptions: parsedScraperOptions, }; this.jobMap.set(jobId, job); this.jobQueue.push(jobId); } if (queuedVersions.length > 0) { logger.info(`📥 Recovered ${queuedVersions.length} pending job(s) from database`); } else { logger.debug("No pending jobs to recover from database"); } } catch (error) { logger.error(`❌ Failed to recover pending jobs: ${error}`); } } /** * Stops the pipeline manager and attempts to gracefully shut down workers. * Currently, it just stops processing new jobs. Cancellation of active jobs * needs explicit `cancelJob` calls. */ async stop(): Promise<void> { if (!this.isRunning) { logger.warn("⚠️ PipelineManager is not running."); return; } this.isRunning = false; logger.debug("PipelineManager stopping. No new jobs will be started."); // Cleanup scraper service to prevent resource leaks await this.scraperService.cleanup(); // Note: Does not automatically cancel active jobs. } /** * Enqueues a new document processing job, aborting any existing QUEUED/RUNNING job for the same library+version (including unversioned). */ async enqueueJob( library: string, version: string | undefined | null, options: ScraperOptions, ): Promise<string> { // Normalize version: treat undefined/null as "" (unversioned) const normalizedVersion = version ?? ""; // Extract URL and convert ScraperOptions to VersionScraperOptions const { url, library: _library, version: _version, signal: _signal, ...versionOptions } = options; // Abort any existing QUEUED or RUNNING job for the same library+version const allJobs = await this.getJobs(); const duplicateJobs = allJobs.filter( (job) => job.library === library && (job.version ?? "") === normalizedVersion && // Normalize null to empty string for comparison [PipelineJobStatus.QUEUED, PipelineJobStatus.RUNNING].includes(job.status), ); for (const job of duplicateJobs) { logger.info( `🚫 Aborting duplicate job for ${library}@${normalizedVersion}: ${job.id}`, ); await this.cancelJob(job.id); } const jobId = uuidv4(); const abortController = new AbortController(); let resolveCompletion!: () => void; let rejectCompletion!: (reason?: unknown) => void; const completionPromise = new Promise<void>((resolve, reject) => { resolveCompletion = resolve; rejectCompletion = reject; }); // Prevent unhandled rejection warnings if rejection occurs before consumers attach handlers completionPromise.catch(() => {}); const job: InternalPipelineJob = { id: jobId, library, version: normalizedVersion, status: PipelineJobStatus.QUEUED, progress: null, error: null, createdAt: new Date(), startedAt: null, finishedAt: null, abortController, completionPromise, resolveCompletion, rejectCompletion, // Database fields (single source of truth) // Will be populated by updateJobStatus progressPages: 0, progressMaxPages: 0, errorMessage: null, updatedAt: new Date(), sourceUrl: url, scraperOptions: versionOptions, }; this.jobMap.set(jobId, job); this.jobQueue.push(jobId); logger.info( `📝 Job enqueued: ${jobId} for ${library}${normalizedVersion ? `@${normalizedVersion}` : " (unversioned)"}`, ); // Update database status to QUEUED await this.updateJobStatus(job, PipelineJobStatus.QUEUED); // Trigger processing if manager is running if (this.isRunning) { this._processQueue().catch((error) => { logger.error(`❌ Error in processQueue during enqueue: ${error}`); }); } return jobId; } /** * Enqueues a job using stored scraper options from a previous indexing run. * If no stored options are found, throws an error. */ async enqueueJobWithStoredOptions( library: string, version: string | undefined | null, ): Promise<string> { const normalizedVersion = version ?? ""; try { // Get the version ID to retrieve stored options const versionId = await this.store.ensureVersion({ library, version: normalizedVersion, }); const stored = await this.store.getScraperOptions(versionId); if (!stored) { throw new Error( `No stored scraper options found for ${library}@${normalizedVersion || "unversioned"}`, ); } const storedOptions = stored.options; // Reconstruct complete scraper options const completeOptions: ScraperOptions = { url: stored.sourceUrl, library, version: normalizedVersion, ...storedOptions, }; logger.info( `🔄 Re-indexing ${library}@${normalizedVersion || "unversioned"} with stored options from ${stored.sourceUrl}`, ); return this.enqueueJob(library, normalizedVersion, completeOptions); } catch (error) { logger.error(`❌ Failed to enqueue job with stored options: ${error}`); throw error; } } /** * Retrieves the current state of a specific job. */ async getJob(jobId: string): Promise<PipelineJob | undefined> { const internalJob = this.jobMap.get(jobId); return internalJob ? this.toPublicJob(internalJob) : undefined; } /** * Retrieves the current state of all jobs (or a subset based on status). */ async getJobs(status?: PipelineJobStatus): Promise<PipelineJob[]> { const allJobs = Array.from(this.jobMap.values()); const filteredJobs = status ? allJobs.filter((job) => job.status === status) : allJobs; return filteredJobs.map((job) => this.toPublicJob(job)); } /** * Returns a promise that resolves when the specified job completes, fails, or is cancelled. * For cancelled jobs, this resolves successfully rather than rejecting. */ async waitForJobCompletion(jobId: string): Promise<void> { const job = this.jobMap.get(jobId); if (!job) { throw new PipelineStateError(`Job not found: ${jobId}`); } try { await job.completionPromise; } catch (error) { // If the job was cancelled, treat it as successful completion if ( error instanceof CancellationError || job.status === PipelineJobStatus.CANCELLED ) { return; // Resolve successfully for cancelled jobs } // Re-throw other errors (failed jobs) throw error; } } /** * Attempts to cancel a queued or running job. */ async cancelJob(jobId: string): Promise<void> { const job = this.jobMap.get(jobId); if (!job) { logger.warn(`❓ Attempted to cancel non-existent job: ${jobId}`); return; } switch (job.status) { case PipelineJobStatus.QUEUED: // Remove from queue and mark as cancelled this.jobQueue = this.jobQueue.filter((id) => id !== jobId); await this.updateJobStatus(job, PipelineJobStatus.CANCELLED); job.finishedAt = new Date(); logger.info(`🚫 Job cancelled (was queued): ${jobId}`); job.rejectCompletion(new PipelineStateError("Job cancelled before starting")); break; case PipelineJobStatus.RUNNING: // Signal cancellation via AbortController await this.updateJobStatus(job, PipelineJobStatus.CANCELLING); job.abortController.abort(); logger.info(`🚫 Signalling cancellation for running job: ${jobId}`); // The worker is responsible for transitioning to CANCELLED and rejecting break; case PipelineJobStatus.COMPLETED: case PipelineJobStatus.FAILED: case PipelineJobStatus.CANCELLED: case PipelineJobStatus.CANCELLING: logger.warn( `⚠️ Job ${jobId} cannot be cancelled in its current state: ${job.status}`, ); break; default: logger.error(`❌ Unhandled job status for cancellation: ${job.status}`); break; } } /** * Removes all jobs that are in a final state (completed, cancelled, or failed). * Only removes jobs that are not currently in the queue or actively running. * @returns The number of jobs that were cleared. */ async clearCompletedJobs(): Promise<number> { const completedStatuses = [ PipelineJobStatus.COMPLETED, PipelineJobStatus.CANCELLED, PipelineJobStatus.FAILED, ]; let clearedCount = 0; const jobsToRemove: string[] = []; // Find all jobs that can be cleared for (const [jobId, job] of this.jobMap.entries()) { if (completedStatuses.includes(job.status)) { jobsToRemove.push(jobId); clearedCount++; } } // Remove the jobs from the map for (const jobId of jobsToRemove) { this.jobMap.delete(jobId); } if (clearedCount > 0) { logger.info(`🧹 Cleared ${clearedCount} completed job(s) from the queue`); } else { logger.debug("No completed jobs to clear"); } return clearedCount; } // --- Private Methods --- /** * Processes the job queue, starting new workers if capacity allows. */ private async _processQueue(): Promise<void> { if (!this.isRunning) return; while (this.activeWorkers.size < this.concurrency && this.jobQueue.length > 0) { const jobId = this.jobQueue.shift(); if (!jobId) continue; // Should not happen, but safety check const job = this.jobMap.get(jobId); if (!job || job.status !== PipelineJobStatus.QUEUED) { logger.warn(`⏭️ Skipping job ${jobId} in queue (not found or not queued).`); continue; } this.activeWorkers.add(jobId); await this.updateJobStatus(job, PipelineJobStatus.RUNNING); job.startedAt = new Date(); // Start the actual job execution asynchronously this._runJob(job).catch(async (error) => { // Catch unexpected errors during job setup/execution not handled by _runJob itself logger.error(`❌ Unhandled error during job ${jobId} execution: ${error}`); if ( job.status !== PipelineJobStatus.FAILED && job.status !== PipelineJobStatus.CANCELLED ) { const errorMessage = error instanceof Error ? error.message : String(error); await this.updateJobStatus(job, PipelineJobStatus.FAILED, errorMessage); job.error = error instanceof Error ? error : new Error(String(error)); job.finishedAt = new Date(); job.rejectCompletion(job.error); } this.activeWorkers.delete(jobId); this._processQueue().catch((error) => { logger.error(`❌ Error in processQueue after job completion: ${error}`); }); // Check if another job can start }); } } /** * Executes a single pipeline job by delegating to a PipelineWorker. * Handles final status updates and promise resolution/rejection. */ private async _runJob(job: InternalPipelineJob): Promise<void> { const { id: jobId, abortController } = job; const signal = abortController.signal; // Get signal for error checking // Instantiate a worker for this job. // Dependencies (store, scraperService) are held by the manager. const worker = new PipelineWorker(this.store, this.scraperService); try { // Delegate the actual work to the worker using composed callbacks await worker.executeJob(job, this.composedCallbacks); // If executeJob completes without throwing, and we weren't cancelled meanwhile... if (signal.aborted) { // Check signal again in case cancellation happened *during* the very last await in executeJob throw new CancellationError("Job cancelled just before completion"); } // Mark as completed await this.updateJobStatus(job, PipelineJobStatus.COMPLETED); job.finishedAt = new Date(); job.resolveCompletion(); logger.info(`✅ Job completed: ${jobId}`); } catch (error) { // Handle errors thrown by the worker, including CancellationError if (error instanceof CancellationError || signal.aborted) { // Explicitly check for CancellationError or if the signal was aborted await this.updateJobStatus(job, PipelineJobStatus.CANCELLED); job.finishedAt = new Date(); // Don't set job.error for cancellations - cancellation is not an error condition const cancellationError = error instanceof CancellationError ? error : new CancellationError("Job cancelled by signal"); logger.info(`🚫 Job execution cancelled: ${jobId}: ${cancellationError.message}`); job.rejectCompletion(cancellationError); } else { // Handle other errors const errorMessage = error instanceof Error ? error.message : String(error); await this.updateJobStatus(job, PipelineJobStatus.FAILED, errorMessage); job.error = error instanceof Error ? error : new Error(String(error)); job.finishedAt = new Date(); logger.error(`❌ Job failed: ${jobId}: ${job.error}`); job.rejectCompletion(job.error); } } finally { // Ensure worker slot is freed and queue processing continues this.activeWorkers.delete(jobId); this._processQueue().catch((error) => { logger.error(`❌ Error in processQueue after job cleanup: ${error}`); }); } } /** * Maps PipelineJobStatus to VersionStatus for database storage. */ private mapJobStatusToVersionStatus(jobStatus: PipelineJobStatus): VersionStatus { switch (jobStatus) { case PipelineJobStatus.QUEUED: return VersionStatus.QUEUED; case PipelineJobStatus.RUNNING: return VersionStatus.RUNNING; case PipelineJobStatus.COMPLETED: return VersionStatus.COMPLETED; case PipelineJobStatus.FAILED: return VersionStatus.FAILED; case PipelineJobStatus.CANCELLED: return VersionStatus.CANCELLED; case PipelineJobStatus.CANCELLING: return VersionStatus.RUNNING; // Keep as running in DB until actually cancelled default: return VersionStatus.NOT_INDEXED; } } /** * Updates both in-memory job status and database version status (write-through). */ private async updateJobStatus( job: InternalPipelineJob, newStatus: PipelineJobStatus, errorMessage?: string, ): Promise<void> { // Update in-memory status job.status = newStatus; if (errorMessage) { job.errorMessage = errorMessage; } job.updatedAt = new Date(); // Update database status try { // Ensure the library and version exist and get the version ID const versionId = await this.store.ensureLibraryAndVersion( job.library, job.version, ); // Update job object with database fields (single source of truth) job.versionId = versionId; job.versionStatus = this.mapJobStatusToVersionStatus(newStatus); const dbStatus = this.mapJobStatusToVersionStatus(newStatus); await this.store.updateVersionStatus(versionId, dbStatus, errorMessage); // Store scraper options when job is first queued if (newStatus === PipelineJobStatus.QUEUED && job.scraperOptions) { try { // Reconstruct ScraperOptions for storage (DocumentStore will filter runtime fields) const fullOptions = { url: job.sourceUrl ?? "", library: job.library, version: job.version, ...job.scraperOptions, }; await this.store.storeScraperOptions(versionId, fullOptions); logger.debug( `Stored scraper options for ${job.library}@${job.version}: ${job.sourceUrl}`, ); } catch (optionsError) { // Log warning but don't fail the job - options storage is not critical logger.warn( `⚠️ Failed to store scraper options for job ${job.id}: ${optionsError}`, ); } } } catch (error) { logger.error(`❌ Failed to update database status for job ${job.id}: ${error}`); // Don't throw - we don't want to break the pipeline for database issues } // Fire callback await this.callbacks.onJobStatusChange?.(job); } /** * Updates both in-memory job progress and database progress (write-through). */ async updateJobProgress( job: InternalPipelineJob, progress: ScraperProgress, ): Promise<void> { // Update in-memory progress job.progress = progress; job.progressPages = progress.pagesScraped; job.progressMaxPages = progress.totalPages; job.updatedAt = new Date(); // Update database progress if we have a version ID if (job.versionId) { try { await this.store.updateVersionProgress( job.versionId, progress.pagesScraped, progress.totalPages, ); } catch (error) { logger.error(`❌ Failed to update database progress for job ${job.id}: ${error}`); // Don't throw - we don't want to break the pipeline for database issues } } // Note: Do not invoke onJobProgress callback here. // Callbacks are wired by services (e.g., workerService/CLI) and already call this method. } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arabold/docs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server