Skip to main content
Glama
vector-rag-service.ts21.7 kB
import { z } from 'zod'; import { Result, ok, err as errResult } from 'neverthrow'; import { EmbeddingService, EmbeddingResult } from './embedding-service.js'; import { SQLiteClient, SearchResult, DocumentRecord, ChunkRecord } from './sqlite-client.js'; import { ConnectionPool, getConnectionPool } from '../db/connection-pool.js'; // Type-safe schemas with validation const DocumentSchema = z.object({ uri: z.string().min(1), title: z.string().min(1), content: z.string().min(1), metadata: z.record(z.string(), z.unknown()).optional() }); const SearchQuerySchema = z.object({ query: z.string().min(1), limit: z.number().int().positive().max(100).default(5), filters: z.record(z.string(), z.unknown()).optional(), threshold: z.number().min(0).max(1).default(0.7) }); export type Document = z.infer<typeof DocumentSchema>; export type SearchQuery = z.infer<typeof SearchQuerySchema>; export interface VectorSearchResult { readonly text: string; readonly section: string; readonly uri: string; readonly title: string; readonly similarity: number; readonly chunkId: number; readonly metadata?: Record<string, unknown>; readonly explanation?: string; } export interface DocumentStats { readonly totalDocuments: number; readonly totalChunks: number; readonly totalEmbeddings: number; readonly modelName: string; readonly indexHealth: 'healthy' | 'degraded' | 'unhealthy'; readonly lastIndexed: Date; } // Custom error types for better error handling export class VectorRAGError extends Error { constructor( message: string, public readonly code: string, public readonly cause?: Error ) { super(message); this.name = 'VectorRAGError'; } } export class VectorRAGService { private readonly embeddingService: EmbeddingService; private readonly connectionPool: ConnectionPool; private readonly modelName: string = 'text-embedding-3-small'; private readonly logger: Logger; private readonly metrics: MetricsCollector; constructor( embeddingService?: EmbeddingService, connectionPool?: ConnectionPool, logger?: Logger, metrics?: MetricsCollector ) { this.embeddingService = embeddingService ?? new EmbeddingService(); this.connectionPool = connectionPool ?? getConnectionPool(); this.logger = logger ?? new ConsoleLogger(); this.metrics = metrics ?? new NoOpMetrics(); } private async getSQLiteClient(): Promise<SQLiteClient> { try { return await this.connectionPool.getSQLiteClient(); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); throw new Error(`Failed to get SQLiteClient: ${errorObj.message}`); } } async addDocument(document: Document): Promise<Result<number, VectorRAGError>> { const startTime = Date.now(); try { // Validate input const validated = DocumentSchema.safeParse(document); if (!validated.success) { return errResult(new VectorRAGError( `Invalid document format: ${validated.error.message}`, 'VALIDATION_ERROR' )); } this.logger.info('Adding document with vectors', { title: document.title, uri: document.uri }); // Get SQLiteClient from connection pool const sqliteClient = await this.getSQLiteClient(); // Check if document exists const existingDocResult = await sqliteClient.getDocument(document.uri); if (existingDocResult.isErr()) { return errResult(new VectorRAGError( `Failed to check document existence: ${existingDocResult.error.message}`, 'DB_QUERY_ERROR' )); } const existingDoc = existingDocResult.value; const result = existingDoc ? await this.updateDocumentWithVectors(document, sqliteClient) : await this.addDocumentWithVectors(document, sqliteClient); this.metrics.recordOperation('addDocument', Date.now() - startTime); this.logger.info('Document successfully processed', { uri: document.uri, operation: existingDoc ? 'updated' : 'created' }); return ok(result); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); this.metrics.recordError('addDocument'); this.logger.error('Failed to add document', errorObj, { uri: document.uri }); return errResult(new VectorRAGError( `Failed to add document: ${errorObj.message}`, 'ADD_DOCUMENT_ERROR', errorObj )); } } async search(searchQuery: SearchQuery): Promise<Result<VectorSearchResult[], VectorRAGError>> { const startTime = Date.now(); try { // Validate input const validated = SearchQuerySchema.safeParse(searchQuery); if (!validated.success) { return errResult(new VectorRAGError( `Invalid search query: ${validated.error.message}`, 'VALIDATION_ERROR' )); } const { query, limit, threshold } = validated.data; this.logger.info('Performing vector search', { query, limit, threshold }); // Get SQLiteClient from connection pool const sqliteClient = await this.getSQLiteClient(); // Generate embedding for query const queryEmbeddingResult = await this.embeddingService.generateEmbedding(query); // Get all chunk embeddings with filtering support const chunkEmbeddings = await this.getAllChunkEmbeddings(validated.data.filters, sqliteClient); if (chunkEmbeddings.length === 0) { this.logger.warn('No embeddings found in database'); return ok([]); } // Find most similar chunks const similarities = this.embeddingService.findMostSimilar( queryEmbeddingResult.embedding, chunkEmbeddings, limit ); // Filter by similarity threshold const filteredSimilarities = similarities.filter(s => s.similarity >= threshold); // Get details for found chunks const results = await Promise.all( filteredSimilarities.map(async (sim): Promise<VectorSearchResult | null> => { const chunk = await this.getChunkWithDocument(sim.id as number, sqliteClient); if (!chunk) return null; return { text: chunk.text, section: chunk.section, uri: chunk.uri, title: chunk.title, similarity: sim.similarity, chunkId: sim.id as number, metadata: chunk.metadata, explanation: this.generateExplanation(query, chunk.text, sim.similarity) }; }) ); const validResults = results.filter((r): r is VectorSearchResult => r !== null); this.metrics.recordOperation('search', Date.now() - startTime); this.metrics.recordSearchResults(validResults.length); this.logger.info('Search completed successfully', { query, resultsFound: validResults.length, threshold }); return ok(validResults); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); this.metrics.recordError('search'); this.logger.error('Search failed', errorObj, { query: searchQuery.query }); return errResult(new VectorRAGError( `Search failed: ${errorObj.message}`, 'SEARCH_ERROR', errorObj )); } } async getDocument(uri: string): Promise<Result<Document | null, VectorRAGError>> { try { this.logger.info('Retrieving document', { uri }); // Get SQLiteClient from connection pool const sqliteClient = await this.getSQLiteClient(); const docResult = await sqliteClient.getDocument(uri); if (docResult.isErr()) { return errResult(new VectorRAGError( `Failed to get document: ${docResult.error.message}`, 'GET_DOCUMENT_ERROR' )); } const doc = docResult.value; if (!doc) { this.logger.info('Document not found', { uri }); return ok(null); } // Get document chunks const chunksResult = await sqliteClient.getDocumentChunks(doc.id); if (chunksResult.isErr()) { return errResult(new VectorRAGError( `Failed to get document chunks: ${chunksResult.error.message}`, 'GET_CHUNKS_ERROR' )); } const chunks = chunksResult.value; const content = chunks.map(chunk => chunk.text).join('\n\n'); const result: Document = { uri: doc.uri, title: doc.title, content, metadata: doc.metadata ? JSON.parse(doc.metadata) : undefined }; return ok(result); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); this.logger.error('Failed to retrieve document', errorObj, { uri }); return errResult(new VectorRAGError( `Failed to retrieve document: ${errorObj.message}`, 'GET_DOCUMENT_ERROR', errorObj )); } } async listDocuments(filters?: Record<string, unknown>): Promise<Result<Document[], VectorRAGError>> { try { this.logger.info('Listing documents', { filters }); // Get SQLiteClient from connection pool const sqliteClient = await this.getSQLiteClient(); const docsResult = await sqliteClient.getDocuments(filters); if (docsResult.isErr()) { return errResult(new VectorRAGError( `Failed to get documents: ${docsResult.error.message}`, 'LIST_DOCUMENTS_ERROR' )); } const docs = docsResult.value; const result = docs.map(doc => ({ uri: doc.uri, title: doc.title, content: '', // Don't load content for listings metadata: doc.metadata ? JSON.parse(doc.metadata) : undefined })); return ok(result); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); this.logger.error('Failed to list documents', errorObj); return errResult(new VectorRAGError( `Failed to list documents: ${errorObj.message}`, 'LIST_DOCUMENTS_ERROR', errorObj )); } } async getDocumentStats(): Promise<Result<DocumentStats, VectorRAGError>> { try { // Get SQLiteClient from connection pool const sqliteClient = await this.getSQLiteClient(); // Use proper SQL queries with error handling const [docsResult, chunksResult, embeddingsResult, healthResult] = await Promise.all([ sqliteClient.query<{ count: number }>('SELECT COUNT(*) as count FROM docs'), sqliteClient.query<{ count: number }>('SELECT COUNT(*) as count FROM chunks'), sqliteClient.query<{ count: number }>('SELECT COUNT(*) as count FROM chunk_vecs'), this.checkIndexHealth() ]); // Check for errors in query results if (docsResult.isErr()) { return errResult(new VectorRAGError( `Failed to get documents count: ${docsResult.error.message}`, 'GET_STATS_ERROR' )); } if (chunksResult.isErr()) { return errResult(new VectorRAGError( `Failed to get chunks count: ${chunksResult.error.message}`, 'GET_STATS_ERROR' )); } if (embeddingsResult.isErr()) { return errResult(new VectorRAGError( `Failed to get embeddings count: ${embeddingsResult.error.message}`, 'GET_STATS_ERROR' )); } const stats: DocumentStats = { totalDocuments: docsResult.value[0]?.count ?? 0, totalChunks: chunksResult.value[0]?.count ?? 0, totalEmbeddings: embeddingsResult.value[0]?.count ?? 0, modelName: this.modelName, indexHealth: healthResult, lastIndexed: new Date() }; return ok(stats); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); this.logger.error('Failed to get document stats', errorObj); return errResult(new VectorRAGError( `Failed to get document stats: ${errorObj.message}`, 'GET_STATS_ERROR', errorObj )); } } async testConnection(): Promise<boolean> { try { const sqliteClient = await this.getSQLiteClient(); const [embeddingTest, dbTest] = await Promise.all([ this.embeddingService.testConnection(), sqliteClient.isConnected() ]); return embeddingTest && dbTest; } catch { return false; } } // Health check endpoint for monitoring async healthCheck(): Promise<{ status: 'healthy' | 'unhealthy'; checks: Record<string, boolean>; timestamp: Date; }> { const sqliteClient = await this.getSQLiteClient(); const checks = { database: await sqliteClient.isConnected(), embedding: await this.embeddingService.testConnection(), memory: process.memoryUsage().heapUsed < 1024 * 1024 * 1024 // 1GB limit }; const status = Object.values(checks).every(Boolean) ? 'healthy' : 'unhealthy'; return { status, checks, timestamp: new Date() }; } // Private helper methods private async addDocumentWithVectors(document: Document, sqliteClient: SQLiteClient): Promise<number> { const docIdResult = await sqliteClient.addDocument( document.uri, document.title, document.content, document.metadata ? JSON.stringify(document.metadata) : undefined ); if (docIdResult.isErr()) { throw new Error(`Failed to add document: ${docIdResult.error.message}`); } const docId = docIdResult.value; await this.generateAndSaveEmbeddings(docId, sqliteClient); return docId; } private async updateDocumentWithVectors(document: Document, sqliteClient: SQLiteClient): Promise<number> { const docResult = await sqliteClient.getDocument(document.uri); if (docResult.isErr()) { throw new Error(`Failed to get document: ${docResult.error.message}`); } const doc = docResult.value; if (!doc) throw new Error('Document not found'); const updateResult = await sqliteClient.updateDocument( document.uri, document.title, document.content, document.metadata ? JSON.stringify(document.metadata) : undefined ); if (updateResult.isErr()) { throw new Error(`Failed to update document: ${updateResult.error.message}`); } // Clean up old embeddings const deleteResult = await sqliteClient.execute( 'DELETE FROM chunk_vecs WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?)', [doc.id] ); if (deleteResult.isErr()) { throw new Error(`Failed to delete old embeddings: ${deleteResult.error.message}`); } await this.generateAndSaveEmbeddings(doc.id, sqliteClient); return doc.id; } private async generateAndSaveEmbeddings(docId: number, sqliteClient: SQLiteClient): Promise<void> { const chunksResult = await sqliteClient.getDocumentChunks(docId); if (chunksResult.isErr()) { throw new Error(`Failed to get document chunks: ${chunksResult.error.message}`); } const chunks = chunksResult.value; const chunkTexts = chunks.map(chunk => chunk.text); // Generate embeddings in batches for better performance const batchSize = 10; for (let i = 0; i < chunkTexts.length; i += batchSize) { const batch = chunkTexts.slice(i, i + batchSize); const batchChunks = chunks.slice(i, i + batchSize); const embeddings = await this.embeddingService.generateEmbeddings(batch); await Promise.all(embeddings.map(async (embedding, idx) => { const chunk = batchChunks[idx]; if (chunk.id) { await this.saveChunkEmbedding(chunk.id, embedding.embedding, this.modelName, sqliteClient); } })); } } private async saveChunkEmbedding(chunkId: number, embedding: number[], model: string, sqliteClient: SQLiteClient): Promise<void> { const sql = ` INSERT INTO chunk_vecs (chunk_id, dim, vec, model, created_at) VALUES (?, ?, ?, ?, datetime('now')) ON CONFLICT(chunk_id) DO UPDATE SET dim = excluded.dim, vec = excluded.vec, model = excluded.model, created_at = excluded.created_at `; // Use Float32Array for better performance and smaller storage const blob = Buffer.from(new Float32Array(embedding).buffer); const result = await sqliteClient.execute(sql, [chunkId, embedding.length, blob, model]); if (result.isErr()) { throw new Error(`Failed to save chunk embedding: ${result.error.message}`); } } private async getAllChunkEmbeddings( filters?: Record<string, unknown>, sqliteClient?: SQLiteClient ): Promise<Array<{ id: number; embedding: number[] }>> { const client = sqliteClient || await this.getSQLiteClient(); let sql = ` SELECT cv.chunk_id, cv.dim, cv.vec, cv.model FROM chunk_vecs cv JOIN chunks c ON cv.chunk_id = c.id JOIN docs d ON c.doc_id = d.id WHERE cv.model = ? `; const params: any[] = [this.modelName]; // Add filtering support if (filters) { Object.entries(filters).forEach(([key, value]) => { sql += ` AND JSON_EXTRACT(d.metadata, '$.${key}') = ?`; params.push(value); }); } const results = await client.query<any>(sql, params); if (results.isErr()) { throw new Error(`Failed to get chunk embeddings: ${results.error.message}`); } return results.value.map((r: any) => ({ id: r.chunk_id, embedding: this.blobToArray(r.vec) })); } private async getChunkWithDocument(chunkId: number, sqliteClient?: SQLiteClient): Promise<{ text: string; section: string; uri: string; title: string; metadata?: Record<string, unknown>; } | null> { const client = sqliteClient || await this.getSQLiteClient(); const sql = ` SELECT c.text, c.section, d.uri, d.title, d.metadata FROM chunks c JOIN docs d ON c.doc_id = d.id WHERE c.id = ? `; const results = await client.query<any>(sql, [chunkId]); if (results.isErr()) { throw new Error(`Failed to get chunk with document: ${results.error.message}`); } if (results.value.length === 0) return null; const result = results.value[0]; return { text: result.text, section: result.section, uri: result.uri, title: result.title, metadata: result.metadata ? JSON.parse(result.metadata) : undefined }; } private blobToArray(blob: Buffer): number[] { // OpenAI API возвращает Float32Array, поэтому всегда используем Float32Array const float32Array = new Float32Array(blob.buffer, blob.byteOffset, blob.byteLength / 4); const result = Array.from(float32Array); this.logger.info('Converted blob to Float32Array', { blobSize: blob.byteLength, arrayLength: result.length, firstValue: result[0], lastValue: result[result.length - 1] }); return result; } private generateExplanation(query: string, text: string, similarity: number): string { // Simple explanation generator - could be enhanced with LLM const confidenceLevel = similarity > 0.9 ? 'high' : similarity > 0.7 ? 'medium' : 'low'; return `Found with ${confidenceLevel} confidence (${(similarity * 100).toFixed(1)}%) based on semantic similarity to "${query}"`; } private async checkIndexHealth(): Promise<'healthy' | 'degraded' | 'unhealthy'> { try { const sqliteClient = await this.getSQLiteClient(); const [totalChunks, totalEmbeddings] = await Promise.all([ sqliteClient.query<{ count: number }>('SELECT COUNT(*) as count FROM chunks'), sqliteClient.query<{ count: number }>('SELECT COUNT(*) as count FROM chunk_vecs') ]); if (totalChunks.isErr() || totalEmbeddings.isErr()) { return 'unhealthy'; } const chunkCount = totalChunks.value[0]?.count ?? 0; const embeddingCount = totalEmbeddings.value[0]?.count ?? 0; if (embeddingCount === 0) return 'unhealthy'; if (embeddingCount < chunkCount * 0.9) return 'degraded'; return 'healthy'; } catch { return 'unhealthy'; } } } // Supporting interfaces and classes interface Logger { info(message: string, meta?: Record<string, unknown>): void; warn(message: string, meta?: Record<string, unknown>): void; error(message: string, error?: Error, meta?: Record<string, unknown>): void; } interface MetricsCollector { recordOperation(operation: string, duration: number): void; recordError(operation: string): void; recordSearchResults(count: number): void; } class ConsoleLogger implements Logger { info(message: string, meta?: Record<string, unknown>): void { console.log(`ℹ️ ${message}`, meta ? JSON.stringify(meta) : ''); } warn(message: string, meta?: Record<string, unknown>): void { console.warn(`⚠️ ${message}`, meta ? JSON.stringify(meta) : ''); } error(message: string, error?: Error, meta?: Record<string, unknown>): void { console.error(`❌ ${message}`, error?.message, meta ? JSON.stringify(meta) : ''); } } class NoOpMetrics implements MetricsCollector { recordOperation(): void {} recordError(): void {} recordSearchResults(): void {} }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Galiusbro/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server