Skip to main content
Glama
PipelineWorker.test.ts18.5 kB
import { beforeEach, describe, expect, it, type Mock, vi } from "vitest"; import type { ScraperService } from "../scraper"; import type { ScrapeResult, ScraperProgressEvent } from "../scraper/types"; import type { DocumentManagementService } from "../store/DocumentManagementService"; import { PipelineWorker } from "./PipelineWorker"; import type { InternalPipelineJob, PipelineManagerCallbacks } from "./types"; import { PipelineJobStatus } from "./types"; // Mock dependencies vi.mock("../store/DocumentManagementService"); vi.mock("../scraper/ScraperService"); describe("PipelineWorker", () => { let mockStore: Partial<DocumentManagementService>; let mockScraperService: Partial<ScraperService>; let mockCallbacks: PipelineManagerCallbacks; let worker: PipelineWorker; let mockJob: InternalPipelineJob; let abortController: AbortController; beforeEach(() => { vi.resetAllMocks(); mockStore = { addScrapeResult: vi.fn().mockResolvedValue(undefined), removeAllDocuments: vi.fn().mockResolvedValue(undefined), deletePage: vi.fn().mockResolvedValue(undefined), }; mockScraperService = { // Mock scrape to allow simulation of progress callbacks scrape: vi.fn().mockImplementation(async (_options, _progressCallback, _signal) => { // Default: simulate immediate completion with no documents return Promise.resolve(); }), }; mockCallbacks = { onJobProgress: vi.fn().mockResolvedValue(undefined), onJobError: vi.fn().mockResolvedValue(undefined), onJobStatusChange: vi.fn().mockResolvedValue(undefined), // Not used by worker directly, but part of type }; worker = new PipelineWorker( mockStore as DocumentManagementService, mockScraperService as ScraperService, ); // Create a default mock job for tests abortController = new AbortController(); mockJob = { id: "test-job-id", library: "test-lib", version: "1.0.0", status: PipelineJobStatus.RUNNING, // Assume worker receives a running job progress: null, error: null, createdAt: new Date(), startedAt: new Date(), finishedAt: null, abortController: abortController, completionPromise: Promise.resolve(), // Mock promise parts if needed, but worker doesn't use them directly resolveCompletion: vi.fn(), rejectCompletion: vi.fn(), sourceUrl: "http://example.com", scraperOptions: { url: "http://example.com", library: "test-lib", version: "1.0.0", maxPages: 10, maxDepth: 1, }, }; }); it("should execute job successfully, calling scrape, addScrapeResult, and onJobProgress", async () => { const mockProcessed1: ScrapeResult = { textContent: "doc1", url: "url1", title: "Doc 1", contentType: "text/html", chunks: [], links: [], errors: [], }; const mockProcessed2: ScrapeResult = { textContent: "doc2", url: "url2", title: "Doc 2", contentType: "text/html", chunks: [], links: [], errors: [], }; // Configure mock scrape to yield progress (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress1: ScraperProgressEvent = { pagesScraped: 1, totalPages: 2, currentUrl: "url1", depth: 1, maxDepth: 1, result: mockProcessed1, totalDiscovered: 0, }; await progressCallback(progress1); const progress2: ScraperProgressEvent = { pagesScraped: 2, totalPages: 2, currentUrl: "url2", depth: 1, maxDepth: 1, result: mockProcessed2, totalDiscovered: 0, }; await progressCallback(progress2); }, ); await worker.executeJob(mockJob, mockCallbacks); // Verify documents were cleared before scraping started expect(mockStore.removeAllDocuments).toHaveBeenCalledOnce(); expect(mockStore.removeAllDocuments).toHaveBeenCalledWith( mockJob.library, mockJob.version, ); // Verify scrape was called with the complete scraper options expect(mockScraperService.scrape).toHaveBeenCalledOnce(); expect(mockScraperService.scrape).toHaveBeenCalledWith( mockJob.scraperOptions, // Now passes the complete options directly expect.any(Function), // The progress callback abortController.signal, ); // Verify addScrapeResult was called for each document expect(mockStore.addScrapeResult).toHaveBeenCalledTimes(2); expect(mockStore.addScrapeResult).toHaveBeenCalledWith( mockJob.library, mockJob.version, 1, mockProcessed1, ); expect(mockStore.addScrapeResult).toHaveBeenCalledWith( mockJob.library, mockJob.version, 1, mockProcessed2, ); // Verify onJobProgress was called expect(mockCallbacks.onJobProgress).toHaveBeenCalledTimes(2); expect(mockCallbacks.onJobProgress).toHaveBeenCalledWith( mockJob, expect.objectContaining({ result: mockProcessed1 }), ); expect(mockCallbacks.onJobProgress).toHaveBeenCalledWith( mockJob, expect.objectContaining({ result: mockProcessed2 }), ); // Verify job progress object was NOT updated directly by worker // The worker should only call callbacks - the manager handles progress updates expect(mockJob.progress).toBeNull(); // Should remain null since worker doesn't update it directly // Verify no errors were reported expect(mockCallbacks.onJobError).not.toHaveBeenCalled(); }); it("should re-throw error if scraperService.scrape fails", async () => { const scraperError = new Error("Scraper failed"); (mockScraperService.scrape as Mock).mockRejectedValue(scraperError); await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toThrow(scraperError); // Verify dependencies were called appropriately expect(mockScraperService.scrape).toHaveBeenCalledOnce(); expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); expect(mockCallbacks.onJobProgress).not.toHaveBeenCalled(); expect(mockCallbacks.onJobError).not.toHaveBeenCalled(); }); it("should call onJobError and continue if store.addScrapeResult fails", async () => { const mockProcessed: ScrapeResult = { textContent: "doc1", url: "url1", title: "Doc 1", contentType: "text/html", chunks: [], links: [], errors: [], }; const storeError = new Error("Database error"); // Simulate scrape yielding one document (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url1", depth: 1, maxDepth: 1, result: mockProcessed, totalDiscovered: 0, }; await progressCallback(progress); }, ); // Simulate addScrapeResult failing (mockStore.addScrapeResult as Mock).mockRejectedValue(storeError); // Execute the job - should complete despite the error await expect(worker.executeJob(mockJob, mockCallbacks)).resolves.toBeUndefined(); // Verify scrape was called expect(mockScraperService.scrape).toHaveBeenCalledOnce(); // Verify addScrapeResult was called expect(mockStore.addScrapeResult).toHaveBeenCalledOnce(); // Verify onJobProgress was called expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); // Verify onJobError was called with the page that failed expect(mockCallbacks.onJobError).toHaveBeenCalledOnce(); expect(mockCallbacks.onJobError).toHaveBeenCalledWith( mockJob, storeError, mockProcessed, ); }); it("should throw CancellationError if cancelled during scrape progress", async () => { const mockProcessed: ScrapeResult = { textContent: "doc1", url: "url1", title: "Doc 1", contentType: "text/html", chunks: [], links: [], errors: [], }; // Simulate scrape checking signal and throwing (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 2, currentUrl: "url1", depth: 1, maxDepth: 1, result: mockProcessed, totalDiscovered: 0, }; // Simulate cancellation happening *before* progress is processed by worker abortController.abort(); // The worker's callback wrapper will check signal and throw await progressCallback(progress); // This part should not be reached throw new Error("Should have been cancelled"); }, ); // Call executeJob once and check the specific error message await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toThrow( "Job cancelled during scraping progress", ); // Also verify it's an instance of CancellationError if needed, though message check is often sufficient // await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toBeInstanceOf(CancellationError); // Verify scrape was called expect(mockScraperService.scrape).toHaveBeenCalledOnce(); // Verify addScrapeResult was NOT called expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); // Verify onJobProgress was NOT called because cancellation check happens first expect(mockCallbacks.onJobProgress).not.toHaveBeenCalled(); // Verify onJobError was NOT called expect(mockCallbacks.onJobError).not.toHaveBeenCalled(); }); it("should throw CancellationError if cancelled after scrape completes", async () => { // Simulate scrape completing successfully (mockScraperService.scrape as Mock).mockImplementation( async (_options, _progressCallback, _signal) => { // No progress needed for this test return Promise.resolve(); }, ); // Abort *after* scrape would have finished but before worker checks again abortController.abort(); // Call executeJob once and check the specific error message await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toThrow( "Job cancelled", ); // Also verify it's an instance of CancellationError if needed // await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toBeInstanceOf(CancellationError); // Verify scrape was called (now only once) expect(mockScraperService.scrape).toHaveBeenCalledOnce(); // Verify other callbacks not called expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); expect(mockCallbacks.onJobProgress).not.toHaveBeenCalled(); expect(mockCallbacks.onJobError).not.toHaveBeenCalled(); }); it("should fail the job if document deletion fails during refresh", async () => { const deletionError = new Error("Database deletion failed"); // Simulate scrape yielding a deletion event (404 page) (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url1", depth: 1, maxDepth: 1, deleted: true, // This is a deletion event result: null, pageId: 123, // Page ID to delete totalDiscovered: 0, }; await progressCallback(progress); }, ); // Simulate deletePage failing (mockStore.deletePage as Mock).mockRejectedValue(deletionError); // Execute the job - should fail due to deletion error await expect(worker.executeJob(mockJob, mockCallbacks)).rejects.toThrow( "Database deletion failed", ); // Verify scrape was called expect(mockScraperService.scrape).toHaveBeenCalledOnce(); // Verify deletion was attempted expect(mockStore.deletePage).toHaveBeenCalledWith(123); // Verify onJobProgress was called expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); // Verify onJobError was called with the deletion error expect(mockCallbacks.onJobError).toHaveBeenCalledOnce(); expect(mockCallbacks.onJobError).toHaveBeenCalledWith(mockJob, deletionError); // Verify addScrapeResult was NOT called (deletion failed before that) expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); }); describe("Database operations based on fetch status", () => { it("should perform NO database writes for a 304 Not Modified status", async () => { // Simulate scrape yielding a 304 Not Modified event (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url1", depth: 1, maxDepth: 1, result: null, // No result for 304 deleted: false, pageId: 123, // Page ID from refresh queue totalDiscovered: 0, }; await progressCallback(progress); }, ); await worker.executeJob(mockJob, mockCallbacks); // Verify NO database operations were performed expect(mockStore.deletePage).not.toHaveBeenCalled(); expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); // Verify progress was still reported expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); expect(mockCallbacks.onJobProgress).toHaveBeenCalledWith( mockJob, expect.objectContaining({ result: null, deleted: false, pageId: 123, }), ); }); it("should DELETE existing documents and INSERT new ones for a 200 OK status on an existing page", async () => { const mockResult: ScrapeResult = { textContent: "updated content", url: "url1", title: "Updated Doc", contentType: "text/html", chunks: [], links: [], errors: [], }; // Simulate scrape yielding a 200 OK event with pageId (existing page) (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url1", depth: 1, maxDepth: 1, result: mockResult, pageId: 123, // Existing page ID totalDiscovered: 0, }; await progressCallback(progress); }, ); await worker.executeJob(mockJob, mockCallbacks); // Verify DELETE was called first expect(mockStore.deletePage).toHaveBeenCalledOnce(); expect(mockStore.deletePage).toHaveBeenCalledWith(123); // Verify INSERT (addScrapeResult) was called after deletion expect(mockStore.addScrapeResult).toHaveBeenCalledOnce(); expect(mockStore.addScrapeResult).toHaveBeenCalledWith( mockJob.library, mockJob.version, 1, mockResult, ); // Verify call order: delete before add const deleteCallOrder = (mockStore.deletePage as Mock).mock.invocationCallOrder[0]; const addCallOrder = (mockStore.addScrapeResult as Mock).mock .invocationCallOrder[0]; expect(deleteCallOrder).toBeLessThan(addCallOrder); // Verify progress was reported expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); }); it("should INSERT new documents for a 200 OK status on a new page", async () => { const mockResult: ScrapeResult = { textContent: "new content", url: "url2", title: "New Doc", contentType: "text/html", chunks: [], links: [], errors: [], }; // Simulate scrape yielding a 200 OK event without pageId (new page) (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url2", depth: 1, maxDepth: 1, result: mockResult, pageId: undefined, // No pageId = new page totalDiscovered: 0, }; await progressCallback(progress); }, ); await worker.executeJob(mockJob, mockCallbacks); // Verify NO deletion was performed (new page) expect(mockStore.deletePage).not.toHaveBeenCalled(); // Verify INSERT (addScrapeResult) was called expect(mockStore.addScrapeResult).toHaveBeenCalledOnce(); expect(mockStore.addScrapeResult).toHaveBeenCalledWith( mockJob.library, mockJob.version, 1, mockResult, ); // Verify progress was reported expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); }); it("should call deletePage for a 404 Not Found status", async () => { // Simulate scrape yielding a 404 Not Found event (mockScraperService.scrape as Mock).mockImplementation( async (_options, progressCallback, _signal) => { const progress: ScraperProgressEvent = { pagesScraped: 1, totalPages: 1, currentUrl: "url1", depth: 1, maxDepth: 1, result: null, deleted: true, // 404 - page was deleted pageId: 123, totalDiscovered: 0, }; await progressCallback(progress); }, ); await worker.executeJob(mockJob, mockCallbacks); // Verify deletion was called expect(mockStore.deletePage).toHaveBeenCalledOnce(); expect(mockStore.deletePage).toHaveBeenCalledWith(123); // Verify NO insert was performed expect(mockStore.addScrapeResult).not.toHaveBeenCalled(); // Verify progress was reported expect(mockCallbacks.onJobProgress).toHaveBeenCalledOnce(); expect(mockCallbacks.onJobProgress).toHaveBeenCalledWith( mockJob, expect.objectContaining({ deleted: true, pageId: 123, }), ); }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arabold/docs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server