Skip to main content
Glama
BackgroundProcessor.ts15.9 kB
import { FileProcessor } from './FileProcessor.js'; import { TextChunker } from './TextChunker.js'; import { EmbeddingService } from './EmbeddingService.js'; import { VectorIndex } from './VectorIndex.js'; import { JobManager } from './JobManager.js'; import { log } from './Logger.js'; import { getMcpPaths, ensureDirectoryExists, extractRepoName } from './PathUtils.js'; import { DatabaseSchema } from './DatabaseSchema.js'; import { runCli } from 'repomix'; import { promises as fs } from 'fs'; import * as path from 'path'; import { exec } from 'child_process'; import { promisify } from 'util'; import { FileProcessingError, EmbeddingError, StorageError, DocumentChunk } from '../types/index.js'; interface FileDownloadOptions { overwrite?: boolean; indexAfterSave?: boolean; maxFileSizeMB?: number; } interface RepoDownloadOptions { includePatterns?: string[]; excludePatterns?: string[]; outputStyle?: 'markdown'; removeComments?: boolean; showLineNumbers?: boolean; } /** * Handles background document processing operations using event loop yielding */ export class BackgroundProcessor { private jobManager: JobManager; constructor() { this.jobManager = JobManager.getInstance(); log.info('BackgroundProcessor initialized'); } /** * Process repository fetch in background with accurate progress */ async processRepoFetch( jobId: string, repoUrl: string, branch?: string, options: RepoDownloadOptions = {} ): Promise<void> { try { // Step 1: Download repository (0-30%) this.jobManager.updateProgress(jobId, 5, 'Initializing repository download...'); const repoName = extractRepoName(repoUrl); const mcpPaths = getMcpPaths(); const outputDir = path.join(mcpPaths.repositories, repoName); this.jobManager.updateProgress(jobId, 10, 'Creating output directory...'); await ensureDirectoryExists(outputDir, 'Repository output directory'); this.jobManager.updateProgress(jobId, 15, 'Downloading repository with repomix...'); const filePath = await this.downloadRepo(jobId, repoUrl, branch, outputDir, options); // Step 2: Process file (30-100%) await this.processFile(jobId, filePath, 30, 70); this.jobManager.completeJob(jobId, { success: true, repoName, filePath, outputDir }); } catch (error: any) { log.error('Repository fetch failed', error, { jobId, repoUrl }); this.jobManager.failJob(jobId, error.message); } } /** * Process file fetch in background with accurate progress */ async processFileFetch( jobId: string, url: string, filename: string, options: FileDownloadOptions = {} ): Promise<void> { try { // Step 1: Download file (0-40%) this.jobManager.updateProgress(jobId, 5, 'Initializing file download...'); const targetFolder = getMcpPaths().fetched; await ensureDirectoryExists(targetFolder, 'File download directory'); this.jobManager.updateProgress(jobId, 10, 'Starting file download...'); const filePath = await this.downloadFile(jobId, url, filename, targetFolder, options); // Step 2: Process file (40-100%) if (options.indexAfterSave !== false) { await this.processFile(jobId, filePath, 40, 60, options.maxFileSizeMB); } else { this.jobManager.updateProgress(jobId, 100, 'File download completed (indexing skipped)'); } this.jobManager.completeJob(jobId, { success: true, filePath, size: (await fs.stat(filePath)).size }); } catch (error: any) { log.error('File fetch failed', error, { jobId, url, filename }); this.jobManager.failJob(jobId, error.message); } }   private async processFile(     jobId: string,     filePath: string,     startProgress: number,     progressRange: number,     maxFileSizeMB?: number   ): Promise<void> {     const fileProcessor = new FileProcessor(maxFileSizeMB);     const textChunker = new TextChunker();     const embeddingService = await EmbeddingService.getInstance(); // Use ServiceLocator for efficient shared instance management const { ServiceLocator } = await import('./ServiceLocator.js'); const serviceLocator = ServiceLocator.getInstance(); const vectorIndex = serviceLocator.getVectorIndex(); try { this.jobManager.updateProgress(jobId, startProgress, 'Reading file content...'); const text = await fileProcessor.extractText(filePath); if (text.trim().length === 0) { throw new FileProcessingError(`File ${filePath} is empty`); } this.jobManager.updateProgress(jobId, startProgress + (progressRange * 0.1), 'Chunking text with event loop yielding...'); const chunks = await textChunker.chunkText(text, filePath, { chunkSize: 1000, overlap: 200, method: 'fixed' }); if (chunks.length === 0) { throw new FileProcessingError(`No chunks created for file ${filePath}`); } this.jobManager.updateProgress(jobId, startProgress + (progressRange * 0.2), `Created ${chunks.length} chunks with yielding`); this.jobManager.updateProgress(jobId, startProgress + (progressRange * 0.2), 'Generating embeddings...'); const embeddedChunks = await this.generateEmbeddings(jobId, chunks, startProgress + (progressRange * 0.2), progressRange * 0.6); this.jobManager.updateProgress(jobId, startProgress + (progressRange * 0.8), 'Storing chunks in database...'); const storedCount = await vectorIndex.storeChunks(embeddedChunks); if (storedCount === 0) { throw new StorageError(`Failed to store chunks for ${filePath}`); } this.jobManager.updateProgress(jobId, startProgress + progressRange, `Indexed ${storedCount} chunks successfully`); // Note: Don't close shared instance, it will be reused } catch (error: any) { throw new StorageError(`File processing failed for ${filePath}: ${error.message}`, error); } } private async generateEmbeddings( jobId: string, chunks: DocumentChunk[], startProgress: number, progressRange: number ): Promise<DocumentChunk[]> { log.debug('Processing embeddings with event loop yielding for MCP responsiveness', { jobId, chunkCount: chunks.length, strategy: 'event-loop-yielding' }); // Use event loop yielding for TensorFlow compatibility // Process small batches with setImmediate() yielding between each batch // This keeps MCP server responsive during embedding generation const embeddingService = await EmbeddingService.getInstance(); const batchSize = 10; // Small batches to enable frequent yielding const results: DocumentChunk[] = []; const totalBatches = Math.ceil(chunks.length / batchSize); for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) { const start = batchIndex * batchSize; const end = Math.min(start + batchSize, chunks.length); const batch = chunks.slice(start, end); log.debug(`Processing embedding batch ${batchIndex + 1}/${totalBatches}`, { batchSize: batch.length, totalProcessed: end, remaining: chunks.length - end }); // Process this batch const embeddedBatch = await embeddingService.generateEmbeddings(batch); results.push(...embeddedBatch); // Update progress after each batch const progress = ((batchIndex + 1) / totalBatches) * progressRange; this.jobManager.updateProgress( jobId, startProgress + progress, `Processed ${end}/${chunks.length} chunks (${Math.round(progress * 100 / progressRange)}% of embedding phase)` ); // IMPORTANT: Yield control back to event loop after each batch // This allows MCP server to handle other requests and prevents blocking await new Promise(resolve => setImmediate(resolve)); log.debug(`Yielded control back to event loop after batch ${batchIndex + 1}/${totalBatches}`); } log.debug('Embedding generation completed with event loop yielding', { totalChunks: chunks.length, totalBatches, strategy: 'event-loop-yielding' }); return results; } private async downloadRepo( jobId: string, repoUrl: string, branch: string | undefined, outputDir: string, options: RepoDownloadOptions ): Promise<string> { const includePatterns = options.includePatterns || ['**/*.md', '**/*.mdx', '**/*.txt', '**/*.json', '**/*.rst', '**/*.yml', '**/*.yaml']; const excludePatterns = [ ...(options.excludePatterns || []), 'node_modules/**', '.git/**', '__pycache__', '*.pyc', 'dist/**', 'build/**', '.DS_Store', 'Thumbs.db' ]; const include = includePatterns.join(','); const exclude = excludePatterns.join(','); const repoName = extractRepoName(repoUrl); const outputFile = path.join(outputDir, `${repoName}.md`); // Try repomix remote approach first try { this.jobManager.updateProgress(jobId, 20, 'Processing repository with repomix...'); await runCli(['.'], outputDir, { remote: repoUrl, remoteBranch: branch, output: outputFile, include, ignore: exclude, style: options.outputStyle || 'markdown', removeComments: options.removeComments || false, removeEmptyLines: false, outputShowLineNumbers: options.showLineNumbers || true, topFilesLen: 0, compress: false, quiet: false, verbose: false }); this.jobManager.updateProgress(jobId, 30, 'Repository download completed'); return outputFile; } catch (error: any) { log.debug('Repomix remote download failed, trying fallback clone', { error: error.message, repoUrl }); // For Azure DevOps and other private repos, fallback to manual clone if access error const isAccessError = error.message.includes('Authentication failed') || error.message.includes('403') || error.message.includes('401') || error.message.includes('404') || error.message.includes('Invalid remote repository URL') || error.message.includes('credential') || error.message.includes('password') || error.message.includes('Permission denied'); if (isAccessError) { log.info('Detected possible authentication/access error, falling back to manual Git clone'); return await this.downloadRepoWithFallback(jobId, repoUrl, branch, outputDir, options, include, exclude, repoName, outputFile); } // Re-throw if not an auth-related error throw error; } } private async downloadRepoWithFallback( jobId: string, repoUrl: string, branch: string | undefined, outputDir: string, options: RepoDownloadOptions, include: string, exclude: string, repoName: string, outputFile: string ): Promise<string> { try { this.jobManager.updateProgress(jobId, 18, 'Access error, falling back to manual Git clone...'); const mcpPaths = getMcpPaths(); const tempCloneDir = path.join(mcpPaths.temp, `clone_${Date.now()}_${repoName}`); await ensureDirectoryExists(tempCloneDir, 'Temp clone directory'); this.jobManager.updateProgress(jobId, 22, 'Cloning repository manually...'); // Clone repository using Git (relies on user's configured authentication) const execAsync = promisify(exec); const cloneCommand = `git clone ${branch ? `-b ${branch} ` : ''}"${repoUrl}" "${tempCloneDir}"`; await execAsync(cloneCommand); this.jobManager.updateProgress(jobId, 26, 'Repository cloned, processing with repomix...'); // Run repomix on the local cloned directory await runCli([tempCloneDir], outputDir, { output: outputFile, include, ignore: exclude, style: options.outputStyle || 'markdown', removeComments: options.removeComments || false, removeEmptyLines: false, outputShowLineNumbers: options.showLineNumbers || true, topFilesLen: 0, compress: false, quiet: false, verbose: false }); this.jobManager.updateProgress(jobId, 30, 'Repository download completed'); // Cleanup temp directory try { await fs.rm(tempCloneDir, { recursive: true, force: true }); log.debug('Cleaned up temp clone directory', { tempCloneDir }); } catch (cleanupError) { log.warn('Failed to cleanup temp clone directory', { tempCloneDir, cleanupError }); } return outputFile; } catch (error: any) { log.error('Manual clone fallback failed', error, { repoUrl, branch, tempDir: path.join(getMcpPaths().temp, `clone_*.${repoName}`) }); throw error; } } private async downloadFile( jobId: string, url: string, filename: string, targetFolder: string, options: FileDownloadOptions ): Promise<string> { const https = await import('https'); const http = await import('http'); return new Promise((resolve, reject) => { const protocol = url.startsWith('https:') ? 'https' : 'http'; const maxSize = (options.maxFileSizeMB || 1024) * 1024 * 1024; const request = protocol === 'https' ? https.get(url, { timeout: 30000 }) : http.get(url, { timeout: 30000 }); let data = ''; let downloadedBytes = 0; let totalBytes = 0; request.on('timeout', () => { request.destroy(); reject(new Error('Download timeout')); }); request.on('error', (error: any) => { reject(new Error(`Network error: ${error.message}`)); }); request.on('response', (res: any) => { if (res.statusCode !== 200) { res.destroy(); reject(new Error(`HTTP ${res.statusCode}: ${url}`)); return; } totalBytes = parseInt(res.headers['content-length'] || '0'); res.on('data', (chunk: any) => { data += chunk; downloadedBytes += chunk.length; if (totalBytes > 0) { const downloadProgress = (downloadedBytes / totalBytes) * 30; this.jobManager.updateProgress( jobId, 10 + downloadProgress, `Downloaded ${(downloadedBytes / 1024).toFixed(1)}KB of ${(totalBytes / 1024).toFixed(1)}KB` ); } if (data.length > maxSize) { res.destroy(); reject(new Error(`File size exceeds ${options.maxFileSizeMB}MB limit`)); } }); res.on('end', async () => { try { const safeFilename = this.sanitizeFilename(filename); const filePath = path.join(targetFolder, safeFilename); const exists = await this.fileExists(filePath); if (exists && !options.overwrite) { throw new Error(`File ${filePath} already exists and overwrite=false`); } await fs.writeFile(filePath, data, 'utf-8'); this.jobManager.updateProgress(jobId, 40, 'File download completed'); resolve(filePath); } catch (error: any) { reject(error); } }); res.on('error', (error: any) => { reject(new Error(`Download failed: ${error.message}`)); }); }); }); } private sanitizeFilename(filename: string): string { return filename .replace(/[^a-zA-Z0-9._-]/g, '_') .replace(/_{2,}/g, '_') .replace(/^_+|_+$/g, '') .substring(0, 255); } private async fileExists(filePath: string): Promise<boolean> { try { await fs.access(filePath); return true; } catch { return false; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PatrickRuddiman/local-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server