Skip to main content
Glama
document-processor.service.ts25.7 kB
import { convertHtmlToMarkdown } from 'dom-to-semantic-markdown'; import { JSDOM } from 'jsdom'; import { getPrismaClient as getMainPrismaClient } from '../config/database'; import { PrismaClient, Prisma } from '../generated/prisma'; import logger from '../utils/logger'; import { ChunkService } from './chunk.service'; import { DocumentService } from './document.service'; import axios from 'axios'; import config from '../config'; // Import the config import { URL } from 'url'; // Import URL for robust path joining // We'll dynamically import AWS dependencies to avoid errors if not installed import { BedrockRuntimeClient } from '@aws-sdk/client-bedrock-runtime'; import { BedrockEmbeddings } from '@langchain/aws'; import matter from 'gray-matter'; // We'll use gray-matter to parse frontmatter import { MarkdownTextSplitter } from "@langchain/textsplitters"; // Import MarkdownTextSplitter import { DocumentationMapperService } from './documentation-mapper.service'; import { PackageMetadata } from './document.service'; // Define custom error class at the top level or within the class scope class ChunkTooLargeError extends Error { originalError: any; constructor(message: string, originalError?: any) { super(message); this.name = 'ChunkTooLargeError'; this.originalError = originalError; // Ensure the prototype chain is set correctly Object.setPrototypeOf(this, ChunkTooLargeError.prototype); } } // Export the custom error if needed elsewhere (optional) // export { ChunkTooLargeError }; // Define the metadata structure interface MetadataExtracted { title?: string; // Made optional as it might come from frontmatter headings?: { level: number; text: string; id?: string }[]; // Keep or adjust based on library output codeBlocks?: { language: string; code: string }[]; // Keep or adjust based on library output tableCount?: number; // Keep or adjust based on library output sectionCount?: number; // Keep or adjust based on library output links?: { text: string; href: string }[]; // Keep or adjust based on library output package?: string; version?: string; type?: string; tags?: string[]; // Add fields expected from dom-to-semantic-markdown's extended metadata if needed openGraph?: Record<string, any>; twitter?: Record<string, any>; jsonLd?: Record<string, any>; // ... any other fields provided by the library's frontmatter } // Define the document chunk interface interface DocumentChunk { content: string; metadata: { title: string; sectionHeading?: string; parentHeading?: string; headingLevel?: number; sectionIndex?: number; subsectionIndex?: number; language?: string; chunkIndex?: number; start?: number; end?: number; order: number; type: 'content' | 'code' | 'table'; }; } export class DocumentProcessorService { private prisma: PrismaClient; private chunkService: ChunkService; private documentService: DocumentService; private bedrockClient?: BedrockRuntimeClient; // Type will be BedrockRuntimeClient when imported private embeddings?: BedrockEmbeddings; // Type will be BedrockEmbeddings when imported constructor(prismaClient?: PrismaClient, documentService?: DocumentService, chunkService?: ChunkService) { this.prisma = prismaClient || getMainPrismaClient(); this.chunkService = chunkService || new ChunkService(this.prisma); this.documentService = documentService || new DocumentService(this.prisma); // Initialize AWS Bedrock client if configured and AWS is enabled this.initializeBedrockClient(); } /** * Initialize AWS Bedrock client if the necessary configuration exists * and the AWS SDK is available */ private initializeBedrockClient(): void { try { // The AWS config is already in the config file const awsConfig = config.aws; const embedProvider = config.embedding?.provider; // Add debug log to show the current AWS configuration logger.debug(`AWS Bedrock configuration: ${JSON.stringify({ provider: embedProvider, region: awsConfig?.region, accessKeyIdPresent: !!awsConfig?.accessKeyId, secretAccessKeyPresent: !!awsConfig?.secretAccessKey, accessKeyIdIsDefault: awsConfig?.accessKeyId === 'your-access-key-id', secretAccessKeyIsDefault: awsConfig?.secretAccessKey === 'your-secret-access-key', embeddingDimensions: config.aws?.embeddingDimensions, })}`); if (embedProvider !== 'bedrock' || !awsConfig || !awsConfig.accessKeyId || !awsConfig.secretAccessKey || awsConfig.accessKeyId === 'your-access-key-id' || // Check for default placeholder values awsConfig.secretAccessKey === 'your-secret-access-key') { logger.warn('AWS Bedrock embedding provider is not properly configured. Please check your environment variables.'); return; } // Import the required libraries const { BedrockRuntimeClient } = require('@aws-sdk/client-bedrock-runtime'); const { BedrockEmbeddings } = require('@langchain/aws'); // Initialize the Bedrock client this.bedrockClient = new BedrockRuntimeClient({ region: awsConfig.region, credentials: { accessKeyId: awsConfig.accessKeyId, secretAccessKey: awsConfig.secretAccessKey }, }); // Initialize Bedrock embeddings with the chosen model this.embeddings = new BedrockEmbeddings({ client: this.bedrockClient, model: 'amazon.titan-embed-text-v1', // Default to this model if not specified }); this.embeddings?.client.config. logger.info('AWS Bedrock embedding client initialized successfully'); } catch (error) { logger.error('Failed to initialize AWS Bedrock client:', error); logger.warn('Make sure you have installed @aws-sdk/client-bedrock-runtime and @langchain/aws packages'); } } /** * Create an embedding for a text string using AWS Bedrock * with retry logic to handle transient failures. * Chunking should ideally happen before this based on tokens, * but this includes a fallback character-based pre-truncation. */ async createEmbedding(text: string): Promise<number[]> { const maxRetries = 3; let retryDelay = 1000; if (!this.embeddings) { throw new Error('AWS Bedrock embedding client not initialized. Check your configuration.'); } // const estimatedTokens = Math.ceil(text.length / 3); // Removed estimation logic let processedText = text; // Start with the original text passed in // Retry logic remains the same, Bedrock might still reject if actual token count is too high for (let attempt = 1; attempt <= maxRetries; attempt++) { try { logger.debug(`Sending embedding request to AWS Bedrock, attempt ${attempt} with ${processedText.length} chars`); const result = await this.embeddings.embedQuery(processedText); // Verify dimensions match what we expect if (config.aws?.embeddingDimensions && result.length !== config.aws.embeddingDimensions) { logger.warn(`Expected ${config.aws.embeddingDimensions} dimensions but got ${result.length}`); } return result; } catch (error: any) { const statusCode = error?.$metadata?.httpStatusCode; // Check specifically for size-related errors (like 400 Bad Request with token messages, or 413 Payload Too Large) const isSizeError = ( statusCode === 400 && error.message && (error.message.includes("Too many input tokens") || error.message.includes("token limit") || error.message.includes("token count") || error.message.toLowerCase().includes("too large")) ) || statusCode === 413; // Consider 413 as a size error too logger.warn(`AWS Bedrock embedding request failed (Attempt ${attempt}/${maxRetries})`, { errorMessage: error.message, statusCode: statusCode, isSizeError: isSizeError, // Log if it's identified as a size error textLength: processedText.length, }); // If it's a size error, throw specific error immediately, do not retry here if (isSizeError) { logger.error(`Chunk identified as too large for Bedrock model (HTTP ${statusCode}). Text length: ${processedText.length}. Error: ${error.message}. Triggering re-chunk attempt.`); // Throw the specific error to be caught by the caller throw new ChunkTooLargeError(`Chunk too large for embedding model (HTTP ${statusCode}): ${error.message}`, error); } // For other potentially transient errors, retry if (attempt === maxRetries) { logger.error(`AWS Bedrock embedding request failed after ${maxRetries} attempts (non-size error).`, { finalErrorMessage: error.message, // Add other relevant final error details if needed }); // Re-throw the original error for final non-size failures throw error; } // Exponential backoff for retries (only for non-size errors) const backoffTime = retryDelay * Math.pow(2, attempt - 1); logger.debug(`Retrying embedding request after ${backoffTime}ms delay (non-size error).`); await new Promise(resolve => setTimeout(resolve, backoffTime)); } } // This point should theoretically not be reached due to throws in the loop throw new Error('Embedding generation failed unexpectedly after multiple retries.'); } /** * Process an HTML document and convert it to a clean markdown format */ async processDocument(documentId: string, html: string, metadata: any = {}): Promise<string> { try { // 1. Convert HTML to Markdown with extended metadata using dom-to-semantic-markdown // Need to provide a DOMParser implementation for Node.js const dom = new JSDOM(html); const markdownWithFrontmatter = convertHtmlToMarkdown(html, { includeMetaData: 'extended', overrideDOMParser: new dom.window.DOMParser(), // Provide JSDOM's parser extractMainContent: true, // Optional: Attempt to extract main content if desired enableTableColumnTracking: true // Optional: Enable if useful for LLM context with tables }); logger.debug('HTML converted to Markdown with frontmatter.'); // 2. Parse the frontmatter and separate content const { data: frontmatterMetadata, content: markdownContent } = matter(markdownWithFrontmatter); logger.debug('Parsed frontmatter from Markdown output.'); // 3. Merge extracted metadata with provided metadata // Frontmatter data takes precedence if keys conflict const mergedMetadata: MetadataExtracted = { ...metadata, // Start with user-provided metadata ...frontmatterMetadata, // Overlay frontmatter data // Ensure 'title' exists, prioritize frontmatter, then user meta, then default title: frontmatterMetadata.title || metadata.title || 'Untitled Document', // You might need to map/transform other fields from frontmatterMetadata // if they don't directly match MetadataExtracted structure. // For example, if dom-to-semantic-markdown puts headings/links in frontmatter: // headings: frontmatterMetadata.headings || [], // links: frontmatterMetadata.links || [], // Add default type/tags if not present type: frontmatterMetadata.type || metadata.type || 'documentation', tags: frontmatterMetadata.tags || metadata.tags || ['auto-generated'] }; // 4. Chunk the resulting Markdown content using MarkdownTextSplitter const chunks = await this.chunkDocument(markdownContent.trim(), mergedMetadata); // Now async // 5. Create embeddings for chunks and store them await this.createChunkEmbeddings(chunks, documentId); // 6. Update the document record with the final generated Markdown content and merged metadata await this.documentService.updateDocument(documentId, { // Optionally store the pure markdown content if needed // contentMarkdown: markdownContent.trim(), metadata: mergedMetadata as any, }); logger.info(`Document ${documentId} processed successfully. Updated metadata.`); // Return the generated markdown content (without frontmatter) // Or return markdownWithFrontmatter if you need the full output elsewhere return markdownContent.trim(); } catch (error) { logger.error(`Error processing document ${documentId}:`, error); throw error; // Re-throw to handle at job level } } /** * Split document into chunks based on token count using TokenTextSplitter. */ private async chunkDocument(markdown: string, metadata: any): Promise<DocumentChunk[]> { // Ensure markdown content is provided before chunking if (!markdown || typeof markdown !== 'string' || markdown.trim().length === 0) { logger.warn(`Markdown content is empty or invalid for document "${metadata.title || 'Untitled'}". Skipping chunking.`); return []; } // --- MarkdownTextSplitter Configuration --- // Use character counts, aiming for logical segments with token size safety // Default chunk size of 1000 characters (~250 tokens) is very conservative const targetCharacterChunkSize = config.chunking?.markdownChunkSize ?? 1000; const targetCharacterChunkOverlap = config.chunking?.markdownChunkOverlap ?? 100; // Calculate a more conservative chunk size (approx 6000 tokens max for 8192 limit) // This gives a ~25% safety margin for token count estimation inaccuracies // Average English text has roughly 4 characters per token const maxSafeTokens = 6000; const approxCharsPerToken = 4; const maxSafeChars = maxSafeTokens * approxCharsPerToken; // Use the smaller of the configured size or the max safe size const safeChunkSize = Math.min(targetCharacterChunkSize, maxSafeChars); logger.debug(`Using MarkdownTextSplitter strategy for document "${metadata.title || 'Untitled'}"`, { configuredChunkSize: targetCharacterChunkSize, maxSafeChars: maxSafeChars, actualChunkSize: safeChunkSize, chunkOverlap: targetCharacterChunkOverlap, }); try { // Use MarkdownTextSplitter instead const splitter = new MarkdownTextSplitter({ // encodingName: encodingName, // Not applicable chunkSize: safeChunkSize, chunkOverlap: targetCharacterChunkOverlap, }); const textChunks = await splitter.splitText(markdown); logger.info(`Split document "${metadata.title || 'Untitled'}" into ${textChunks.length} chunks using Markdown structure.`); // Map the text chunks to our DocumentChunk structure const documentChunks: DocumentChunk[] = textChunks.map((text, index) => ({ content: text, metadata: { title: metadata.title || 'Untitled Document', chunkIndex: index, // Index of the chunk within the document order: index, // Simple sequential order type: 'content' // Assuming all are content chunks for now // Note: start/end character indices are no longer relevant // Other metadata like headings could be added if a more complex splitter is used } })); return documentChunks; } catch (error) { logger.error(`Error splitting document "${metadata.title || 'Untitled'}" by Markdown structure:`, error); // Fallback or throw error? Let's throw for now. throw new Error(`Failed to split document by Markdown structure: ${error instanceof Error ? error.message : String(error)}`); } } /** * Create embeddings for document chunks and store them in the database. * Handles re-chunking if a chunk is too large for the embedding model. * @param chunks The initial document chunks to create embeddings for * @param documentId The ID of the parent document */ private async createChunkEmbeddings(chunks: DocumentChunk[], documentId: string): Promise<void> { // Main try block for the entire function try { if (!chunks || chunks.length === 0) { logger.info(`No chunks provided for document ${documentId}. Skipping embedding generation.`); return; } if (!this.embeddings) { logger.warn(`AWS Bedrock embedding client not initialized. Skipping embedding generation for document ${documentId}. Check your configuration.`); return; } const finalChunksToSave: Array<{ documentId: string; content: string; embedding: number[]; metadata: any }> = []; let initialSuccessCount = 0; let initialErrorCount = 0; let rechunkAttemptCount = 0; let subChunkSuccessCount = 0; let subChunkErrorCount = 0; logger.info(`Starting embedding generation for ${chunks.length} initial chunks from document ${documentId}`); // Iterate directly over all chunks sequentially for (const chunk of chunks) { if (!chunk.content || typeof chunk.content !== 'string' || chunk.content.trim().length === 0) { logger.warn(`Skipping empty or invalid initial chunk (order: ${chunk.metadata.order}) for document ${documentId}`); continue; } try { // --- Attempt to embed the original chunk --- logger.debug(`Attempting embedding for initial chunk ${initialSuccessCount + initialErrorCount + 1}/${chunks.length} (order: ${chunk.metadata.order})`); // Add delay between individual requests except the very first one if (initialSuccessCount + initialErrorCount + rechunkAttemptCount > 0) { // Delay if not the first attempt overall await new Promise(resolve => setTimeout(resolve, 200)); // 200ms delay } const embedding = await this.createEmbedding(chunk.content); finalChunksToSave.push({ documentId, content: chunk.content, embedding: embedding, metadata: chunk.metadata // Keep original metadata }); initialSuccessCount++; } catch (error) { // --- Handle errors, including potential re-chunking --- if (error instanceof ChunkTooLargeError) { initialErrorCount++; // Count the initial failure rechunkAttemptCount++; logger.warn(`Initial chunk (order: ${chunk.metadata.order}) too large. Attempting to re-chunk.`); // --- Re-chunking Logic --- try { // Define smaller chunk parameters based on the CHARACTER size const targetCharacterChunkSize = config.chunking?.markdownChunkSize ?? 1500; // Get the primary character size // Be much more aggressive with sub-chunk sizing - approximately 1/4 the size // For 8,192 token limit with ~4 chars per token, aim for ~5,000 tokens max const approxCharsPerToken = 4; const maxTokensForSubChunk = 4000; // Even more conservative (5000 -> 4000) const maxCharsForSubChunk = maxTokensForSubChunk * approxCharsPerToken; // Take the smaller of calculated size or 1/5 of original config (instead of 1/4) const subChunkSize = Math.min( maxCharsForSubChunk, Math.floor(targetCharacterChunkSize / 5) ); // Use minimal overlap for sub-chunks to avoid token waste const subChunkOverlap = Math.min(100, Math.floor(subChunkSize * 0.05)); // 5% overlap, max 100 chars logger.debug(`Re-chunking with parameters: size=${subChunkSize}, overlap=${subChunkOverlap}, original chunk size=${chunk.content.length} chars`); const subSplitter = new MarkdownTextSplitter({ chunkSize: subChunkSize, chunkOverlap: subChunkOverlap, }); const subTextChunks = await subSplitter.splitText(chunk.content); // Split the *original* large chunk content logger.info(`Re-split large chunk (order: ${chunk.metadata.order}) into ${subTextChunks.length} sub-chunks.`); // Attempt to embed each sub-chunk for (let subIndex = 0; subIndex < subTextChunks.length; subIndex++) { const subText = subTextChunks[subIndex]; if (!subText || subText.trim().length === 0) continue; // Skip empty sub-chunks try { logger.debug(`Attempting embedding for sub-chunk ${subIndex + 1}/${subTextChunks.length} (from original order: ${chunk.metadata.order})`); // Add delay between sub-chunk attempts if (subIndex > 0) { await new Promise(resolve => setTimeout(resolve, 100)); // Smaller delay } const subEmbedding = await this.createEmbedding(subText); // Call createEmbedding again // Adjust metadata for the sub-chunk const subMetadata = { ...chunk.metadata, // Copy original metadata order: parseFloat(`${chunk.metadata.order}.${subIndex + 1}`), // Append sub-index to order subChunkIndex: subIndex, // Add sub-chunk index originalChunkOrder: chunk.metadata.order, // Reference original order // Remove potentially confusing fields inherited from parent chunk? // start: undefined, // Character indices are not applicable // end: undefined, chunkIndex: undefined // Main chunkIndex doesn't apply here }; finalChunksToSave.push({ documentId, content: subText, embedding: subEmbedding, metadata: subMetadata }); subChunkSuccessCount++; } catch (subError: any) { subChunkErrorCount++; logger.error(`Failed to embed sub-chunk ${subIndex + 1}/${subTextChunks.length} (from original order: ${chunk.metadata.order}): ${subError.message}`, { subTextStart: subText.substring(0,50)+'...'}); // Decide if failure of a sub-chunk is critical. For now, just log and continue. } } // End of sub-chunk loop } catch (rechunkError) { // Error during the re-chunking process itself (e.g., splitter error) logger.error(`Failed to re-chunk original chunk (order: ${chunk.metadata.order}): ${rechunkError}`); // Original chunk failed, and re-chunking also failed. } // --- End Re-chunking Logic --- } else { // Handle other errors from createEmbedding (non-size related) initialErrorCount++; logger.error(`Failed to generate embedding for initial chunk (order: ${chunk.metadata.order}) due to non-size error: ${error instanceof Error ? error.message : String(error)}`); // Optional: Implement backoff here if needed for consecutive non-size errors } } // End of main try-catch for the chunk } // End of loop iterating directly over chunks // --- Final Save --- (Ensure this is inside the main try block) if (finalChunksToSave.length > 0) { // Sort by potentially modified order just in case finalChunksToSave.sort((a: any, b: any) => a.metadata.order - b.metadata.order); // Added types for clarity await this.chunkService.createManyChunks(finalChunksToSave, documentId); logger.info(`Saved ${finalChunksToSave.length} final chunks for document ${documentId}.`); } else { logger.warn(`No chunks were successfully embedded and saved for document ${documentId}.`); } // --- Final Logging --- (Ensure this is inside the main try block) logger.info(`Embedding generation complete for document ${documentId}:`); logger.info(` Initial Chunks: ${chunks.length} total, ${initialSuccessCount} succeeded, ${initialErrorCount} failed.`); if (rechunkAttemptCount > 0) { logger.info(` Re-chunking Attempts: ${rechunkAttemptCount} large chunks triggered re-chunking.`); logger.info(` Sub-Chunks Created: ${subChunkSuccessCount + subChunkErrorCount} total sub-chunks processed.`); logger.info(` Sub-Chunks Succeeded: ${subChunkSuccessCount} successfully embedded.`); logger.info(` Sub-Chunks Failed: ${subChunkErrorCount} failed embedding.`); } logger.info(` Total chunks saved to DB: ${finalChunksToSave.length}`); // Main catch block for the entire function } catch (error: any) { logger.error(`Error creating embeddings for document ${documentId}:`, error); throw error; // Re-throw critical errors } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/visheshd/docmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server