Skip to main content
Glama
streaming-handler.js13.9 kB
import sql from 'mssql'; /** * Streaming Data Handler for Large Query Results * Provides streaming capabilities to handle large datasets without memory issues */ export class StreamingHandler { constructor(config = {}) { this.config = { // Maximum rows to process in memory at once batchSize: config.batchSize || 1000, // Maximum memory usage before switching to streaming (in MB) maxMemoryMB: config.maxMemoryMB || 50, // Maximum response size before chunking (in characters) maxResponseSize: config.maxResponseSize || 1000000, // 1MB // Whether to enable streaming by default enableStreaming: config.enableStreaming ?? true, ...config }; } /** * Executes a query with automatic streaming based on size * @param {object} request - SQL request object * @param {string} query - SQL query to execute * @param {object} context - Execution context * @returns {Promise<object>} Query result with streaming metadata */ async executeQueryWithStreaming(request, query, context = {}) { const startTime = Date.now(); // First, check if we should stream based on query complexity const shouldStream = await this.shouldStreamQuery(request, query, context); if (shouldStream) { return await this.executeStreamingQuery(request, query, context, startTime); } else { return await this.executeRegularQuery(request, query, context, startTime); } } /** * Determines if a query should be streamed * @param {object} request - SQL request object * @param {string} query - SQL query * @param {object} context - Execution context * @returns {Promise<boolean>} Whether to use streaming */ async shouldStreamQuery(request, query, context = {}) { if (!this.config.enableStreaming) { return false; } // Force streaming for specific operations if (context.forceStreaming) { return true; } // For table data operations, check table size first if possible if (context.tableName && context.schema) { try { const sizeQuery = ` SELECT SUM(p.rows) as estimated_rows, SUM(a.total_pages) * 8 / 1024 as estimated_size_mb FROM sys.tables t INNER JOIN sys.partitions p ON t.object_id = p.object_id INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.name = '${context.tableName}' AND s.name = '${context.schema}' AND p.index_id <= 1 `; const sizeResult = await request.query(sizeQuery); const stats = sizeResult.recordset[0]; // Stream if table has > 10k rows or > 10MB if (stats.estimated_rows > 10000 || stats.estimated_size_mb > 10) { return true; } else { return false; } } catch (error) { console.warn('Could not determine table size for streaming decision:', error.message); return false; } } // Check if query indicates large result set const largResultIndicators = [ /SELECT\s+\*\s+FROM\s+\w+\s*$/i, // SELECT * without WHERE/LIMIT /BULK\s+/i, /EXPORT\s+/i, /BACKUP\s+/i ]; if (largResultIndicators.some(pattern => pattern.test(query))) { return true; } return false; } /** * Executes a regular (non-streaming) query * @param {object} request - SQL request object * @param {string} query - SQL query * @param {object} context - Execution context * @param {number} startTime - Execution start time * @returns {Promise<object>} Query result */ async executeRegularQuery(request, query, context, startTime) { const result = await request.query(query); return { success: true, recordset: result.recordset, recordsets: result.recordsets, rowsAffected: result.rowsAffected, streaming: false, performance: { duration: Date.now() - startTime, rowCount: result.recordset ? result.recordset.length : 0, memoryUsed: this.estimateMemoryUsage(result.recordset) } }; } /** * Executes a query with streaming processing * @param {object} request - SQL request object * @param {string} query - SQL query * @param {object} context - Execution context * @param {number} startTime - Execution start time * @returns {Promise<object>} Streaming query result */ async executeStreamingQuery(request, query, context, startTime) { const chunks = []; let totalRows = 0; let currentBatch = []; let chunkCount = 0; return new Promise((resolve, reject) => { const streamRequest = new sql.Request(request.parent); streamRequest.stream = true; streamRequest.on('recordset', columns => { // Record the column metadata context.columns = columns; }); streamRequest.on('row', row => { totalRows++; currentBatch.push(row); // Process batch when it reaches the configured size if (currentBatch.length >= this.config.batchSize) { this.processBatch(currentBatch, chunks, ++chunkCount, context); currentBatch = []; } }); streamRequest.on('done', result => { // Process final batch if any rows remain if (currentBatch.length > 0) { this.processBatch(currentBatch, chunks, ++chunkCount, context); } resolve({ success: true, streaming: true, chunks: chunks, chunkCount: chunkCount, totalRows: totalRows, rowsAffected: result.rowsAffected, performance: { duration: Date.now() - startTime, rowCount: totalRows, avgBatchSize: totalRows / chunkCount || 0, memoryEfficient: true } }); }); streamRequest.on('error', error => { reject(error); }); // Execute the streaming query streamRequest.query(query); }); } /** * Processes a batch of rows for streaming * @param {array} batch - Batch of rows * @param {array} chunks - Array to store chunks * @param {number} chunkNumber - Current chunk number * @param {object} context - Processing context */ processBatch(batch, chunks, chunkNumber, context) { let processedData; // Process based on output format switch (context.outputFormat) { case 'csv': processedData = this.batchToCsv(batch, context); break; case 'json': processedData = this.batchToJson(batch, context); break; default: processedData = batch; } chunks.push({ chunkNumber, data: processedData, rowCount: batch.length, size: JSON.stringify(processedData).length }); } /** * Converts a batch to CSV format * @param {array} batch - Batch of rows * @param {object} context - Processing context * @returns {string} CSV data */ batchToCsv(batch, context) { if (batch.length === 0) return ''; let csvContent = ''; // Add header only for first chunk if (!context.csvHeaderAdded) { const headers = Object.keys(batch[0]); csvContent += headers.join(',') + '\\n'; context.csvHeaderAdded = true; } // Add data rows for (const row of batch) { const values = Object.values(row).map(value => { if (value === null || value === undefined) return ''; const stringValue = String(value); // Escape commas, quotes, and newlines if (stringValue.includes(',') || stringValue.includes('"') || stringValue.includes('\\n')) { return `"${stringValue.replace(/"/g, '""')}"`; } return stringValue; }); csvContent += values.join(',') + '\\n'; } return csvContent; } /** * Converts a batch to JSON format * @param {array} batch - Batch of rows * @param {object} context - Processing context * @returns {string} JSON data */ batchToJson(batch, context) { return JSON.stringify(batch, null, context.prettyPrint ? 2 : undefined); } /** * Executes a table export with streaming * @param {object} request - SQL request object * @param {string} tableName - Table to export * @param {object} options - Export options * @returns {Promise<object>} Export result */ async streamTableExport(request, tableName, options = {}) { const { schema = 'dbo', database = null, limit = null, whereClause = null, outputFormat = 'csv' } = options; // Build the streaming query let query = `SELECT${limit ? ` TOP ${limit}` : ''} * FROM [${schema}].[${tableName}]`; if (whereClause) { query += ` WHERE ${whereClause}`; } const context = { tableName, schema, database, outputFormat, forceStreaming: true, csvHeaderAdded: false }; // Switch to target database if specified if (database) { await request.query(`USE [${database}]`); } return await this.executeQueryWithStreaming(request, query, context); } /** * Estimates memory usage of a recordset * @param {array} recordset - Query recordset * @returns {number} Estimated memory usage in MB */ estimateMemoryUsage(recordset) { if (!recordset || recordset.length === 0) return 0; // Rough estimation based on JSON serialization size const sampleSize = Math.min(recordset.length, 100); const sampleData = recordset.slice(0, sampleSize); const avgRowSize = JSON.stringify(sampleData).length / sampleSize; return (avgRowSize * recordset.length) / (1024 * 1024); // Convert to MB } /** * Reconstructs full data from streaming chunks * @param {array} chunks - Array of data chunks * @param {string} outputFormat - Output format ('json', 'csv', 'raw') * @returns {*} Reconstructed data */ reconstructFromChunks(chunks, outputFormat = 'json') { if (!chunks || chunks.length === 0) { return outputFormat === 'csv' ? '' : []; } switch (outputFormat) { case 'csv': return chunks.map(chunk => chunk.data).join(''); case 'json': { const allRows = []; chunks.forEach(chunk => { if (typeof chunk.data === 'string') { try { // Safe JSON parsing with validation const parsedData = this._safeJsonParse(chunk.data); if (Array.isArray(parsedData)) { allRows.push(...parsedData); } else { throw new Error('Parsed JSON data is not an array'); } } catch (error) { throw new Error(`Invalid JSON chunk data: ${error.message}`); } } else { allRows.push(...chunk.data); } }); return allRows; } default: return chunks.flatMap(chunk => chunk.data); } } /** * Updates streaming configuration * @param {object} newConfig - New configuration options */ updateConfig(newConfig) { this.config = { ...this.config, ...newConfig }; } /** * Gets current streaming configuration * @returns {object} Current configuration */ getConfig() { return { ...this.config }; } /** * Gets streaming statistics * @param {object} streamingResult - Result from streaming operation * @returns {object} Statistics object */ getStreamingStats(streamingResult) { if (!streamingResult.streaming) { return { streaming: false, memoryEfficient: false, totalRows: streamingResult.rowCount || 0 }; } return { streaming: true, memoryEfficient: true, totalRows: streamingResult.totalRows, chunkCount: streamingResult.chunkCount, avgChunkSize: streamingResult.totalRows / streamingResult.chunkCount, performance: streamingResult.performance }; } /** * Safely parses JSON with validation and error handling * @param {string} jsonString - JSON string to parse * @returns {*} Parsed JSON data * @throws {Error} If JSON is invalid or potentially malicious * @private */ _safeJsonParse(jsonString) { if (typeof jsonString !== 'string') { throw new Error('Input must be a string'); } // Check for reasonable size limits (prevent DoS via large JSON) if (jsonString.length > 10 * 1024 * 1024) { // 10MB limit throw new Error('JSON data exceeds maximum size limit (10MB)'); } // Parse JSON with error handling let parsed; try { parsed = JSON.parse(jsonString); } catch (error) { throw new Error(`JSON parsing failed: ${error.message}`); } // Validate that parsed data doesn't contain prototype pollution attempts if (parsed && typeof parsed === 'object') { this._validateJsonSafety(parsed); } return parsed; } /** * Validates that JSON data doesn't contain prototype pollution attempts * @param {*} obj - Object to validate * @throws {Error} If dangerous patterns are detected * @private */ _validateJsonSafety(obj) { if (obj === null || typeof obj !== 'object') { return; } // Check for prototype pollution attempts const dangerousKeys = ['__proto__', 'constructor', 'prototype']; if (Array.isArray(obj)) { for (const item of obj) { this._validateJsonSafety(item); } } else { for (const key in obj) { if (dangerousKeys.includes(key)) { throw new Error(`Potentially dangerous JSON key detected: ${key}`); } // Recursively validate nested objects if (obj[key] && typeof obj[key] === 'object') { this._validateJsonSafety(obj[key]); } } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/egarcia74/warp-sql-server-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server