Skip to main content
Glama

n8n-MCP

by 88-888
batch-processor.test.ts23.3 kB
import { describe, it, expect, beforeEach, vi, afterEach, beforeAll, afterAll, type MockInstance } from 'vitest'; import { TelemetryBatchProcessor } from '../../../src/telemetry/batch-processor'; import { TelemetryEvent, WorkflowTelemetry, TELEMETRY_CONFIG } from '../../../src/telemetry/telemetry-types'; import { TelemetryError, TelemetryErrorType } from '../../../src/telemetry/telemetry-error'; import type { SupabaseClient } from '@supabase/supabase-js'; // Mock logger to avoid console output in tests vi.mock('../../../src/utils/logger', () => ({ logger: { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), } })); describe('TelemetryBatchProcessor', () => { let batchProcessor: TelemetryBatchProcessor; let mockSupabase: SupabaseClient; let mockIsEnabled: ReturnType<typeof vi.fn>; let mockProcessExit: MockInstance; const createMockSupabaseResponse = (error: any = null) => ({ data: null, error, status: error ? 400 : 200, statusText: error ? 'Bad Request' : 'OK', count: null }); beforeEach(() => { vi.useFakeTimers(); mockIsEnabled = vi.fn().mockReturnValue(true); mockSupabase = { from: vi.fn().mockReturnValue({ insert: vi.fn().mockResolvedValue(createMockSupabaseResponse()) }) } as any; // Mock process events to prevent actual exit mockProcessExit = vi.spyOn(process, 'exit').mockImplementation((() => { // Do nothing - just prevent actual exit }) as any); vi.clearAllMocks(); batchProcessor = new TelemetryBatchProcessor(mockSupabase, mockIsEnabled); }); afterEach(() => { // Stop the batch processor to clear any intervals batchProcessor.stop(); mockProcessExit.mockRestore(); vi.clearAllTimers(); vi.useRealTimers(); }); describe('start()', () => { it('should start periodic flushing when enabled', () => { const setIntervalSpy = vi.spyOn(global, 'setInterval'); batchProcessor.start(); expect(setIntervalSpy).toHaveBeenCalledWith( expect.any(Function), TELEMETRY_CONFIG.BATCH_FLUSH_INTERVAL ); }); it('should not start when disabled', () => { mockIsEnabled.mockReturnValue(false); const setIntervalSpy = vi.spyOn(global, 'setInterval'); batchProcessor.start(); expect(setIntervalSpy).not.toHaveBeenCalled(); }); it('should not start without Supabase client', () => { const processor = new TelemetryBatchProcessor(null, mockIsEnabled); const setIntervalSpy = vi.spyOn(global, 'setInterval'); processor.start(); expect(setIntervalSpy).not.toHaveBeenCalled(); processor.stop(); }); it('should set up process exit handlers', () => { const onSpy = vi.spyOn(process, 'on'); batchProcessor.start(); expect(onSpy).toHaveBeenCalledWith('beforeExit', expect.any(Function)); expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function)); }); }); describe('stop()', () => { it('should clear flush timer', () => { const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); batchProcessor.start(); batchProcessor.stop(); expect(clearIntervalSpy).toHaveBeenCalled(); }); }); describe('flush()', () => { const mockEvents: TelemetryEvent[] = [ { user_id: 'user1', event: 'tool_used', properties: { tool: 'httpRequest', success: true } }, { user_id: 'user2', event: 'tool_used', properties: { tool: 'webhook', success: false } } ]; const mockWorkflows: WorkflowTelemetry[] = [ { user_id: 'user1', workflow_hash: 'hash1', node_count: 3, node_types: ['webhook', 'httpRequest', 'set'], has_trigger: true, has_webhook: true, complexity: 'medium', sanitized_workflow: { nodes: [], connections: {} } } ]; it('should flush events successfully', async () => { await batchProcessor.flush(mockEvents); expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_events'); expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledWith(mockEvents); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(2); expect(metrics.batchesSent).toBe(1); }); it('should flush workflows successfully', async () => { await batchProcessor.flush(undefined, mockWorkflows); expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_workflows'); expect(mockSupabase.from('telemetry_workflows').insert).toHaveBeenCalledWith(mockWorkflows); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(1); expect(metrics.batchesSent).toBe(1); }); it('should flush both events and workflows', async () => { await batchProcessor.flush(mockEvents, mockWorkflows); expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_events'); expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_workflows'); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(3); // 2 events + 1 workflow expect(metrics.batchesSent).toBe(2); }); it('should not flush when disabled', async () => { mockIsEnabled.mockReturnValue(false); await batchProcessor.flush(mockEvents, mockWorkflows); expect(mockSupabase.from).not.toHaveBeenCalled(); }); it('should not flush without Supabase client', async () => { const processor = new TelemetryBatchProcessor(null, mockIsEnabled); await processor.flush(mockEvents); expect(mockSupabase.from).not.toHaveBeenCalled(); }); it('should skip flush when circuit breaker is open', async () => { // Open circuit breaker by failing multiple times const errorResponse = createMockSupabaseResponse(new Error('Network error')); vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse); // Fail enough times to open circuit breaker (5 by default) for (let i = 0; i < 5; i++) { await batchProcessor.flush(mockEvents); } const metrics = batchProcessor.getMetrics(); expect(metrics.circuitBreakerState.state).toBe('open'); // Next flush should be skipped vi.clearAllMocks(); await batchProcessor.flush(mockEvents); expect(mockSupabase.from).not.toHaveBeenCalled(); expect(batchProcessor.getMetrics().eventsDropped).toBeGreaterThan(0); }); it('should record flush time metrics', async () => { const startTime = Date.now(); await batchProcessor.flush(mockEvents); const metrics = batchProcessor.getMetrics(); expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0); expect(metrics.lastFlushTime).toBeGreaterThanOrEqual(0); }); }); describe('batch creation', () => { it('should create single batch for small datasets', async () => { const events: TelemetryEvent[] = Array.from({ length: 10 }, (_, i) => ({ user_id: `user${i}`, event: 'test_event', properties: { index: i } })); await batchProcessor.flush(events); expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(1); expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledWith(events); }); it('should create multiple batches for large datasets', async () => { const events: TelemetryEvent[] = Array.from({ length: 75 }, (_, i) => ({ user_id: `user${i}`, event: 'test_event', properties: { index: i } })); await batchProcessor.flush(events); // Should create 2 batches (50 + 25) based on TELEMETRY_CONFIG.MAX_BATCH_SIZE expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(2); const firstCall = vi.mocked(mockSupabase.from('telemetry_events').insert).mock.calls[0][0]; const secondCall = vi.mocked(mockSupabase.from('telemetry_events').insert).mock.calls[1][0]; expect(firstCall).toHaveLength(TELEMETRY_CONFIG.MAX_BATCH_SIZE); expect(secondCall).toHaveLength(25); }); }); describe('workflow deduplication', () => { it('should deduplicate workflows by hash', async () => { const workflows: WorkflowTelemetry[] = [ { user_id: 'user1', workflow_hash: 'hash1', node_count: 2, node_types: ['webhook', 'set'], has_trigger: true, has_webhook: true, complexity: 'simple', sanitized_workflow: { nodes: [], connections: {} } }, { user_id: 'user2', workflow_hash: 'hash1', // Same hash - should be deduplicated node_count: 2, node_types: ['webhook', 'set'], has_trigger: true, has_webhook: true, complexity: 'simple', sanitized_workflow: { nodes: [], connections: {} } }, { user_id: 'user1', workflow_hash: 'hash2', // Different hash - should be kept node_count: 3, node_types: ['webhook', 'httpRequest', 'set'], has_trigger: true, has_webhook: true, complexity: 'medium', sanitized_workflow: { nodes: [], connections: {} } } ]; await batchProcessor.flush(undefined, workflows); const insertCall = vi.mocked(mockSupabase.from('telemetry_workflows').insert).mock.calls[0][0]; expect(insertCall).toHaveLength(2); // Should deduplicate to 2 workflows const hashes = insertCall.map((w: WorkflowTelemetry) => w.workflow_hash); expect(hashes).toEqual(['hash1', 'hash2']); }); }); describe('error handling and retries', () => { it('should retry on failure with exponential backoff', async () => { const error = new Error('Network timeout'); const errorResponse = createMockSupabaseResponse(error); // Mock to fail first 2 times, then succeed vi.mocked(mockSupabase.from('telemetry_events').insert) .mockResolvedValueOnce(errorResponse) .mockResolvedValueOnce(errorResponse) .mockResolvedValueOnce(createMockSupabaseResponse()); const events: TelemetryEvent[] = [{ user_id: 'user1', event: 'test_event', properties: {} }]; await batchProcessor.flush(events); // Should have been called 3 times (2 failures + 1 success) expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(3); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(1); // Should succeed on third try }); it('should fail after max retries', async () => { const error = new Error('Persistent network error'); const errorResponse = createMockSupabaseResponse(error); vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse); const events: TelemetryEvent[] = [{ user_id: 'user1', event: 'test_event', properties: {} }]; await batchProcessor.flush(events); // Should have been called MAX_RETRIES times expect(mockSupabase.from('telemetry_events').insert) .toHaveBeenCalledTimes(TELEMETRY_CONFIG.MAX_RETRIES); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsFailed).toBe(1); expect(metrics.batchesFailed).toBe(1); expect(metrics.deadLetterQueueSize).toBe(1); }); it('should handle operation timeout', async () => { // Mock the operation to always fail with timeout error vi.mocked(mockSupabase.from('telemetry_events').insert).mockRejectedValue( new Error('Operation timed out') ); const events: TelemetryEvent[] = [{ user_id: 'user1', event: 'test_event', properties: {} }]; // The flush should fail after retries await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsFailed).toBe(1); }); }); describe('dead letter queue', () => { it('should add failed events to dead letter queue', async () => { const error = new Error('Persistent error'); const errorResponse = createMockSupabaseResponse(error); vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse); const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'event1', properties: {} }, { user_id: 'user2', event: 'event2', properties: {} } ]; await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics.deadLetterQueueSize).toBe(2); }); it('should process dead letter queue when circuit is healthy', async () => { const error = new Error('Temporary error'); const errorResponse = createMockSupabaseResponse(error); // First 3 calls fail (for all retries), then succeed vi.mocked(mockSupabase.from('telemetry_events').insert) .mockResolvedValueOnce(errorResponse) // Retry 1 .mockResolvedValueOnce(errorResponse) // Retry 2 .mockResolvedValueOnce(errorResponse) // Retry 3 .mockResolvedValueOnce(createMockSupabaseResponse()); // Success on next flush const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'event1', properties: {} } ]; // First flush - should fail after all retries and add to dead letter queue await batchProcessor.flush(events); expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(1); // Second flush - should process dead letter queue await batchProcessor.flush([]); expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(0); }); it('should maintain dead letter queue size limit', async () => { const error = new Error('Persistent error'); const errorResponse = createMockSupabaseResponse(error); // Always fail - each flush will retry 3 times then add to dead letter queue vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse); // Circuit breaker opens after 5 failures, so only first 5 flushes will be processed // 5 batches of 5 items = 25 total items in dead letter queue for (let i = 0; i < 10; i++) { const events: TelemetryEvent[] = Array.from({ length: 5 }, (_, j) => ({ user_id: `user${i}_${j}`, event: 'test_event', properties: { batch: i, index: j } })); await batchProcessor.flush(events); } const metrics = batchProcessor.getMetrics(); // Circuit breaker opens after 5 failures, so only 25 items are added expect(metrics.deadLetterQueueSize).toBe(25); // 5 flushes * 5 items each expect(metrics.eventsDropped).toBe(25); // 5 additional flushes dropped due to circuit breaker }); it('should handle mixed events and workflows in dead letter queue', async () => { const error = new Error('Mixed error'); const errorResponse = createMockSupabaseResponse(error); vi.mocked(mockSupabase.from).mockImplementation((table) => ({ insert: vi.fn().mockResolvedValue(errorResponse), url: { href: '' }, headers: {}, select: vi.fn(), upsert: vi.fn(), update: vi.fn(), delete: vi.fn() } as any)); const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'event1', properties: {} } ]; const workflows: WorkflowTelemetry[] = [ { user_id: 'user1', workflow_hash: 'hash1', node_count: 1, node_types: ['webhook'], has_trigger: true, has_webhook: true, complexity: 'simple', sanitized_workflow: { nodes: [], connections: {} } } ]; await batchProcessor.flush(events, workflows); expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(2); // Mock successful operations for dead letter queue processing vi.mocked(mockSupabase.from).mockImplementation((table) => ({ insert: vi.fn().mockResolvedValue(createMockSupabaseResponse()), url: { href: '' }, headers: {}, select: vi.fn(), upsert: vi.fn(), update: vi.fn(), delete: vi.fn() } as any)); await batchProcessor.flush([]); expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(0); }); }); describe('circuit breaker integration', () => { it('should update circuit breaker on success', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics.circuitBreakerState.state).toBe('closed'); expect(metrics.circuitBreakerState.failureCount).toBe(0); }); it('should update circuit breaker on failure', async () => { const error = new Error('Network error'); const errorResponse = createMockSupabaseResponse(error); vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse); const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics.circuitBreakerState.failureCount).toBeGreaterThan(0); }); }); describe('metrics collection', () => { it('should collect comprehensive metrics', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'event1', properties: {} }, { user_id: 'user2', event: 'event2', properties: {} } ]; await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics).toHaveProperty('eventsTracked'); expect(metrics).toHaveProperty('eventsDropped'); expect(metrics).toHaveProperty('eventsFailed'); expect(metrics).toHaveProperty('batchesSent'); expect(metrics).toHaveProperty('batchesFailed'); expect(metrics).toHaveProperty('averageFlushTime'); expect(metrics).toHaveProperty('lastFlushTime'); expect(metrics).toHaveProperty('rateLimitHits'); expect(metrics).toHaveProperty('circuitBreakerState'); expect(metrics).toHaveProperty('deadLetterQueueSize'); expect(metrics.eventsTracked).toBe(2); expect(metrics.batchesSent).toBe(1); }); it('should track flush time statistics', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; // Perform multiple flushes to test average calculation await batchProcessor.flush(events); await batchProcessor.flush(events); await batchProcessor.flush(events); const metrics = batchProcessor.getMetrics(); expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0); expect(metrics.lastFlushTime).toBeGreaterThanOrEqual(0); }); it('should maintain limited flush time history', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; // Perform more than 100 flushes to test history limit for (let i = 0; i < 105; i++) { await batchProcessor.flush(events); } // Should still calculate average correctly (history is limited internally) const metrics = batchProcessor.getMetrics(); expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0); }); }); describe('resetMetrics()', () => { it('should reset all metrics to initial state', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; // Generate some metrics await batchProcessor.flush(events); // Verify metrics exist let metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBeGreaterThan(0); expect(metrics.batchesSent).toBeGreaterThan(0); // Reset metrics batchProcessor.resetMetrics(); // Verify reset metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(0); expect(metrics.eventsDropped).toBe(0); expect(metrics.eventsFailed).toBe(0); expect(metrics.batchesSent).toBe(0); expect(metrics.batchesFailed).toBe(0); expect(metrics.averageFlushTime).toBe(0); expect(metrics.rateLimitHits).toBe(0); expect(metrics.circuitBreakerState.state).toBe('closed'); expect(metrics.circuitBreakerState.failureCount).toBe(0); }); }); describe('edge cases', () => { it('should handle empty arrays gracefully', async () => { await batchProcessor.flush([], []); expect(mockSupabase.from).not.toHaveBeenCalled(); const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBe(0); expect(metrics.batchesSent).toBe(0); }); it('should handle undefined inputs gracefully', async () => { await batchProcessor.flush(); expect(mockSupabase.from).not.toHaveBeenCalled(); }); it('should handle null Supabase client gracefully', async () => { const processor = new TelemetryBatchProcessor(null, mockIsEnabled); const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; await expect(processor.flush(events)).resolves.not.toThrow(); }); it('should handle concurrent flush operations', async () => { const events: TelemetryEvent[] = [ { user_id: 'user1', event: 'test_event', properties: {} } ]; // Start multiple flush operations concurrently const flushPromises = [ batchProcessor.flush(events), batchProcessor.flush(events), batchProcessor.flush(events) ]; await Promise.all(flushPromises); // Should handle concurrent operations gracefully const metrics = batchProcessor.getMetrics(); expect(metrics.eventsTracked).toBeGreaterThan(0); }); }); describe('process lifecycle integration', () => { it('should flush on process beforeExit', async () => { const flushSpy = vi.spyOn(batchProcessor, 'flush'); batchProcessor.start(); // Trigger beforeExit event process.emit('beforeExit', 0); expect(flushSpy).toHaveBeenCalled(); }); it('should flush and exit on SIGINT', async () => { const flushSpy = vi.spyOn(batchProcessor, 'flush'); batchProcessor.start(); // Trigger SIGINT event process.emit('SIGINT', 'SIGINT'); expect(flushSpy).toHaveBeenCalled(); expect(mockProcessExit).toHaveBeenCalledWith(0); }); it('should flush and exit on SIGTERM', async () => { const flushSpy = vi.spyOn(batchProcessor, 'flush'); batchProcessor.start(); // Trigger SIGTERM event process.emit('SIGTERM', 'SIGTERM'); expect(flushSpy).toHaveBeenCalled(); expect(mockProcessExit).toHaveBeenCalledWith(0); }); }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/88-888/n8n-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server