Skip to main content
Glama

docs-mcp-server

pipeline-architecture.md5.45 kB
# Pipeline Architecture ## Overview The pipeline system manages asynchronous document processing with persistent job state and coordinated execution across embedded or external workers. ## Core Components ### PipelineFactory Central factory that selects pipeline implementation based on configuration: ```typescript interface PipelineOptions { recoverJobs?: boolean; // Enable job recovery from database serverUrl?: string; // External worker URL concurrency?: number; // Worker concurrency limit } ``` **Selection Logic:** - `serverUrl` specified → PipelineClient (external worker) - `recoverJobs: true` → PipelineManager with recovery - `recoverJobs: false` → PipelineManager without recovery ### PipelineManager Manages job queue and worker coordination for embedded processing: **Responsibilities:** - Job queue management with concurrency limits - Worker lifecycle management - Progress tracking and status updates - Database state synchronization - Job recovery after restart **Job Recovery:** - Loads QUEUED and RUNNING jobs from database on startup - Resets RUNNING jobs to QUEUED for re-execution - Maintains job configuration for reproducible processing ### PipelineClient Type-safe tRPC client providing identical interface to PipelineManager for external worker communication: **Features:** - tRPC client for remote job operations over HTTP - Identical method signatures to PipelineManager - Error handling and connection management - Connectivity check via ping ### PipelineWorker Executes individual jobs with progress reporting: **Execution Flow:** 1. Fetch job configuration from queue 2. Initialize scraper with job parameters 3. Process content through scraper pipeline 4. Update progress via callbacks 5. Store results and mark completion ## Job Lifecycle ### Job States ``` QUEUED → RUNNING → COMPLETED ↓ FAILED ↓ CANCELLED ``` ### State Transitions - **QUEUED**: Job created, waiting for worker - **RUNNING**: Worker processing job - **COMPLETED**: Successful completion - **FAILED**: Error during processing - **CANCELLED**: Manual cancellation ### Progress Tracking Jobs report progress through callback mechanism: - Pages discovered and processed - Current processing status - Error messages and warnings - Estimated completion time ## Write-Through Architecture ### Single Source of Truth PipelineJob objects contain both runtime state and database fields, ensuring consistency between memory and persistent storage. ### Immediate Persistence All state changes immediately write to database: - Status transitions - Progress updates - Error information - Configuration changes ### Recovery Mechanism Database state enables automatic recovery: 1. Load pending jobs on startup 2. Reset RUNNING jobs to QUEUED 3. Resume processing with original configuration 4. Maintain progress history ## Concurrency Management ### Worker Pool PipelineManager maintains configurable worker pool: - Default concurrency: 3 workers - Configurable via environment or CLI - Workers process jobs independently - Queue coordination prevents conflicts ### Job Distribution Jobs are distributed to available workers using: - FIFO queue ordering - Worker availability checking - Load balancing across workers - Graceful worker shutdown handling ## External Worker RPC ### Procedures (tRPC) - `ping` - Connectivity check - `enqueueJob` - Create new job - `getJobs` - List jobs with optional filtering - `getJob` - Get job details - `cancelJob` - Cancel a job - `clearCompletedJobs` - Remove completed/cancelled/failed jobs ### Data Contracts Requests and responses use shared TypeScript types through tRPC, ensuring end-to-end type safety. ### Error Handling Errors propagate as structured tRPC errors with messages suitable for user feedback and logs. ## Configuration Persistence ### Job Configuration Each job stores complete scraper configuration: - Source URL and scraping parameters - Library name and version information - Processing options and filters - Retry and timeout settings ### Reproducible Processing Stored configuration enables: - Exact re-indexing with same parameters - Configuration auditing and debugging - Version-specific processing rules - Consistent results across runs ## Monitoring and Observability ### Progress Reporting Real-time progress updates through: - Callback-based progress notifications - Database persistence of progress state - Web UI polling for status display - Log-based progress tracking ### Error Tracking Comprehensive error information: - Exception stack traces - Processing context at failure - Retry attempt logging - User-friendly error messages ### Performance Metrics Job execution metrics: - Processing duration - Pages processed per minute - Memory and resource usage - Queue depth and throughput ## Scaling Patterns ### Vertical Scaling Increase worker concurrency within single process: - Higher concurrency limits - More memory allocation - Faster storage backend ### Horizontal Scaling Distribute workers across processes: - External worker deployment - Load balancer coordination - Independent worker scaling - Database connection pooling ### Hybrid Deployment Combine embedded and external workers: - Coordinator with embedded workers - Additional external workers for peak load - Flexible resource allocation - Cost-optimized scaling

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arabold/docs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server