Skip to main content
Glama
aggregation-pipeline.js29.3 kB
/** * MongoDB Aggregation Pipeline Implementation * * Provides a complete aggregation framework for processing documents: * - Pipeline stages ($match, $group, $sort, $project, etc.) * - Aggregation expressions and operators * - Statistical and mathematical functions * - Array and string manipulation */ import { EventEmitter } from 'events'; /** * Aggregation Pipeline Executor */ export class AggregationPipeline extends EventEmitter { constructor(options = {}) { super(); this.options = { maxDocuments: options.maxDocuments || 100000, maxMemory: options.maxMemory || 100 * 1024 * 1024, // 100MB allowDiskUse: options.allowDiskUse || false, explain: options.explain || false }; // Statistics this.stats = { pipelines: 0, stages: 0, documentsProcessed: 0, executionTime: 0, memoryUsed: 0 }; // Stage handlers this.stageHandlers = new Map([ ['$match', this.stageMatch.bind(this)], ['$project', this.stageProject.bind(this)], ['$group', this.stageGroup.bind(this)], ['$sort', this.stageSort.bind(this)], ['$limit', this.stageLimit.bind(this)], ['$skip', this.stageSkip.bind(this)], ['$unwind', this.stageUnwind.bind(this)], ['$lookup', this.stageLookup.bind(this)], ['$addFields', this.stageAddFields.bind(this)], ['$set', this.stageAddFields.bind(this)], // Alias for $addFields ['$unset', this.stageUnset.bind(this)], ['$replaceRoot', this.stageReplaceRoot.bind(this)], ['$replaceWith', this.stageReplaceRoot.bind(this)], // Alias ['$count', this.stageCount.bind(this)], ['$facet', this.stageFacet.bind(this)], ['$bucket', this.stageBucket.bind(this)], ['$bucketAuto', this.stageBucketAuto.bind(this)], ['$sample', this.stageSample.bind(this)], ['$merge', this.stageMerge.bind(this)], ['$out', this.stageOut.bind(this)] ]); // Expression evaluator this.expressionEvaluator = new ExpressionEvaluator(); // Logging this.logger = options.logger || this.createDefaultLogger(); } createDefaultLogger() { return { debug: () => {}, info: console.error, warn: console.warn, error: console.error }; } /** * Execute aggregation pipeline */ async execute(documents, pipeline, context = {}) { const startTime = Date.now(); this.stats.pipelines++; if (!Array.isArray(pipeline)) { throw new Error('Pipeline must be an array'); } let current = Array.from(documents); const executionPlan = []; try { // Process each stage for (const stage of pipeline) { const stageName = Object.keys(stage)[0]; const stageConfig = stage[stageName]; if (!this.stageHandlers.has(stageName)) { throw new Error(`Unknown pipeline stage: ${stageName}`); } this.logger.debug(`Executing stage ${stageName} on ${current.length} documents`); const stageStart = Date.now(); const handler = this.stageHandlers.get(stageName); current = await handler(current, stageConfig, context); const stageTime = Date.now() - stageStart; executionPlan.push({ stage: stageName, inputDocs: documents.length, outputDocs: current.length, executionTime: stageTime }); this.stats.stages++; this.stats.documentsProcessed += current.length; // Check limits if (current.length > this.options.maxDocuments) { throw new Error(`Document limit exceeded: ${current.length} > ${this.options.maxDocuments}`); } } const totalTime = Date.now() - startTime; this.stats.executionTime += totalTime; if (this.options.explain) { return { result: current, executionStats: { executionTimeMillis: totalTime, totalDocsExamined: documents.length, totalDocsReturned: current.length, executionStages: executionPlan } }; } return current; } catch (error) { this.logger.error('Pipeline execution error:', error); throw error; } } /** * $match stage - Filter documents */ async stageMatch(documents, filter) { return documents.filter(doc => this.matchesFilter(doc, filter)); } /** * $project stage - Reshape documents */ async stageProject(documents, projection) { return documents.map(doc => this.projectDocument(doc, projection)); } /** * $group stage - Group documents */ async stageGroup(documents, groupSpec) { const groups = new Map(); // Group documents for (const doc of documents) { const key = this.expressionEvaluator.evaluate(groupSpec._id, doc); const keyStr = JSON.stringify(key); if (!groups.has(keyStr)) { groups.set(keyStr, []); } groups.get(keyStr).push(doc); } // Build result documents const results = []; for (const [keyStr, groupDocs] of groups) { const key = JSON.parse(keyStr); const result = { _id: key }; // Calculate accumulator fields for (const [field, accumulator] of Object.entries(groupSpec)) { if (field === '_id') continue; result[field] = this.calculateAccumulator(accumulator, groupDocs); } results.push(result); } return results; } /** * $sort stage - Sort documents */ async stageSort(documents, sortSpec) { return documents.sort((a, b) => { for (const [field, direction] of Object.entries(sortSpec)) { const aVal = this.getFieldValue(a, field); const bVal = this.getFieldValue(b, field); if (aVal < bVal) return direction === 1 ? -1 : 1; if (aVal > bVal) return direction === 1 ? 1 : -1; } return 0; }); } /** * $limit stage - Limit number of documents */ async stageLimit(documents, limit) { return documents.slice(0, limit); } /** * $skip stage - Skip documents */ async stageSkip(documents, skip) { return documents.slice(skip); } /** * $unwind stage - Deconstruct array field */ async stageUnwind(documents, unwindSpec) { const results = []; const path = typeof unwindSpec === 'string' ? unwindSpec : unwindSpec.path; const fieldName = path.startsWith('$') ? path.substring(1) : path; const preserveNull = unwindSpec.preserveNullAndEmptyArrays || false; const includeIndex = unwindSpec.includeArrayIndex; for (const doc of documents) { const array = this.getFieldValue(doc, fieldName); if (!Array.isArray(array) || array.length === 0) { if (preserveNull) { results.push(doc); } continue; } array.forEach((item, index) => { const newDoc = { ...doc }; this.setFieldValue(newDoc, fieldName, item); if (includeIndex) { newDoc[includeIndex] = index; } results.push(newDoc); }); } return results; } /** * $lookup stage - Join with another collection */ async stageLookup(documents, lookupSpec, context) { const { from, localField, foreignField, as } = lookupSpec; // Get foreign collection (would need collection manager in real implementation) const foreignDocs = context.collections ? context.collections[from] || [] : []; return documents.map(doc => { const localValue = this.getFieldValue(doc, localField); const matched = foreignDocs.filter(foreign => this.getFieldValue(foreign, foreignField) === localValue ); return { ...doc, [as]: matched }; }); } /** * $addFields stage - Add new fields */ async stageAddFields(documents, fields) { return documents.map(doc => { const newDoc = { ...doc }; for (const [field, expression] of Object.entries(fields)) { newDoc[field] = this.expressionEvaluator.evaluate(expression, doc); } return newDoc; }); } /** * $unset stage - Remove fields */ async stageUnset(documents, fields) { const fieldList = Array.isArray(fields) ? fields : [fields]; return documents.map(doc => { const newDoc = { ...doc }; for (const field of fieldList) { this.deleteFieldValue(newDoc, field); } return newDoc; }); } /** * $replaceRoot stage - Replace document root */ async stageReplaceRoot(documents, replaceSpec) { const newRoot = replaceSpec.newRoot || replaceSpec; return documents.map(doc => { if (typeof newRoot === 'string' && newRoot.startsWith('$')) { // Use field value as new root return this.getFieldValue(doc, newRoot.substring(1)) || {}; } // Evaluate expression as new root return this.expressionEvaluator.evaluate(newRoot, doc); }); } /** * $count stage - Count documents */ async stageCount(documents, fieldName) { return [{ [fieldName]: documents.length }]; } /** * $facet stage - Multiple pipelines */ async stageFacet(documents, facetSpec) { const result = {}; for (const [name, pipeline] of Object.entries(facetSpec)) { result[name] = await this.execute(documents, pipeline); } return [result]; } /** * $bucket stage - Categorize into buckets */ async stageBucket(documents, bucketSpec) { const { groupBy, boundaries, default: defaultBucket, output } = bucketSpec; const buckets = new Map(); // Initialize buckets for (let i = 0; i < boundaries.length - 1; i++) { buckets.set(boundaries[i], []); } if (defaultBucket !== undefined) { buckets.set(defaultBucket, []); } // Categorize documents for (const doc of documents) { const value = this.expressionEvaluator.evaluate(groupBy, doc); let placed = false; for (let i = 0; i < boundaries.length - 1; i++) { if (value >= boundaries[i] && value < boundaries[i + 1]) { buckets.get(boundaries[i]).push(doc); placed = true; break; } } if (!placed && defaultBucket !== undefined) { buckets.get(defaultBucket).push(doc); } } // Build results const results = []; for (const [boundary, docs] of buckets) { if (docs.length > 0) { const bucket = { _id: boundary, count: docs.length }; if (output) { for (const [field, accumulator] of Object.entries(output)) { bucket[field] = this.calculateAccumulator(accumulator, docs); } } results.push(bucket); } } return results; } /** * $bucketAuto stage - Auto-generate buckets */ async stageBucketAuto(documents, bucketSpec) { const { groupBy, buckets: numBuckets, output } = bucketSpec; // Calculate values and sort const values = documents.map(doc => ({ value: this.expressionEvaluator.evaluate(groupBy, doc), doc })).sort((a, b) => a.value - b.value); if (values.length === 0) return []; // Create buckets const bucketSize = Math.ceil(values.length / numBuckets); const results = []; for (let i = 0; i < numBuckets && i * bucketSize < values.length; i++) { const start = i * bucketSize; const end = Math.min((i + 1) * bucketSize, values.length); const bucketDocs = values.slice(start, end).map(v => v.doc); if (bucketDocs.length > 0) { const bucket = { _id: { min: values[start].value, max: values[end - 1].value }, count: bucketDocs.length }; if (output) { for (const [field, accumulator] of Object.entries(output)) { bucket[field] = this.calculateAccumulator(accumulator, bucketDocs); } } results.push(bucket); } } return results; } /** * $sample stage - Random sample */ async stageSample(documents, sampleSpec) { const size = sampleSpec.size; if (size >= documents.length) { return documents; } // Fisher-Yates shuffle for random sampling const result = [...documents]; for (let i = result.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); [result[i], result[j]] = [result[j], result[i]]; } return result.slice(0, size); } /** * $merge stage - Merge into collection */ async stageMerge(documents, mergeSpec, context) { // In real implementation, would merge into target collection this.logger.info(`Would merge ${documents.length} documents into ${mergeSpec.into}`); return documents; } /** * $out stage - Output to collection */ async stageOut(documents, collection, context) { // In real implementation, would output to target collection this.logger.info(`Would output ${documents.length} documents to ${collection}`); return documents; } /** * Calculate accumulator value */ calculateAccumulator(accumulator, documents) { const [op, expression] = Object.entries(accumulator)[0]; switch (op) { case '$sum': return this.accumulatorSum(documents, expression); case '$avg': return this.accumulatorAvg(documents, expression); case '$min': return this.accumulatorMin(documents, expression); case '$max': return this.accumulatorMax(documents, expression); case '$count': return documents.length; case '$first': return documents.length > 0 ? this.expressionEvaluator.evaluate(expression, documents[0]) : null; case '$last': return documents.length > 0 ? this.expressionEvaluator.evaluate(expression, documents[documents.length - 1]) : null; case '$push': return documents.map(doc => this.expressionEvaluator.evaluate(expression, doc) ); case '$addToSet': const set = new Set(); for (const doc of documents) { set.add(JSON.stringify( this.expressionEvaluator.evaluate(expression, doc) )); } return Array.from(set).map(s => JSON.parse(s)); case '$stdDevPop': return this.accumulatorStdDev(documents, expression, false); case '$stdDevSamp': return this.accumulatorStdDev(documents, expression, true); default: throw new Error(`Unknown accumulator: ${op}`); } } accumulatorSum(documents, expression) { if (expression === 1) return documents.length; return documents.reduce((sum, doc) => { const val = this.expressionEvaluator.evaluate(expression, doc); return sum + (typeof val === 'number' ? val : 0); }, 0); } accumulatorAvg(documents, expression) { if (documents.length === 0) return null; return this.accumulatorSum(documents, expression) / documents.length; } accumulatorMin(documents, expression) { if (documents.length === 0) return null; return documents.reduce((min, doc) => { const val = this.expressionEvaluator.evaluate(expression, doc); return val < min ? val : min; }, Infinity); } accumulatorMax(documents, expression) { if (documents.length === 0) return null; return documents.reduce((max, doc) => { const val = this.expressionEvaluator.evaluate(expression, doc); return val > max ? val : max; }, -Infinity); } accumulatorStdDev(documents, expression, sample) { if (documents.length === 0) return null; if (sample && documents.length === 1) return null; const values = documents.map(doc => this.expressionEvaluator.evaluate(expression, doc) ).filter(v => typeof v === 'number'); const mean = values.reduce((a, b) => a + b, 0) / values.length; const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0 ) / (sample ? values.length - 1 : values.length); return Math.sqrt(variance); } /** * Helper methods */ matchesFilter(doc, filter) { // Simple filter matching (would use full query engine in real implementation) for (const [field, condition] of Object.entries(filter)) { const value = this.getFieldValue(doc, field); if (typeof condition === 'object' && condition !== null) { // Handle operators for (const [op, expected] of Object.entries(condition)) { switch (op) { case '$eq': if (value !== expected) return false; break; case '$ne': if (value === expected) return false; break; case '$gt': if (value <= expected) return false; break; case '$gte': if (value < expected) return false; break; case '$lt': if (value >= expected) return false; break; case '$lte': if (value > expected) return false; break; case '$in': if (!expected.includes(value)) return false; break; case '$nin': if (expected.includes(value)) return false; break; } } } else { // Direct equality if (value !== condition) return false; } } return true; } projectDocument(doc, projection) { const result = {}; for (const [field, spec] of Object.entries(projection)) { if (spec === 1) { // Include field result[field] = this.getFieldValue(doc, field); } else if (spec === 0) { // Exclude field (handled separately) } else { // Expression result[field] = this.expressionEvaluator.evaluate(spec, doc); } } return result; } getFieldValue(doc, path) { if (path === '$$ROOT') return doc; if (path === '$$CURRENT') return doc; const parts = path.split('.'); let value = doc; for (const part of parts) { if (value == null) return undefined; value = value[part]; } return value; } setFieldValue(obj, path, value) { const parts = path.split('.'); const last = parts.pop(); let current = obj; for (const part of parts) { if (!current[part]) { current[part] = {}; } current = current[part]; } current[last] = value; } deleteFieldValue(obj, path) { const parts = path.split('.'); const last = parts.pop(); let current = obj; for (const part of parts) { if (!current[part]) return; current = current[part]; } delete current[last]; } /** * Get statistics */ getStats() { return { ...this.stats }; } } /** * Expression Evaluator for aggregation expressions */ class ExpressionEvaluator { evaluate(expression, document) { if (expression === null || expression === undefined) { return expression; } // String starting with $ is field reference if (typeof expression === 'string' && expression.startsWith('$')) { if (expression === '$$ROOT') return document; if (expression === '$$CURRENT') return document; const fieldPath = expression.substring(1); return this.getFieldValue(document, fieldPath); } // Object might be an operator expression if (typeof expression === 'object' && !Array.isArray(expression)) { const keys = Object.keys(expression); if (keys.length === 1 && keys[0].startsWith('$')) { return this.evaluateOperator(keys[0], expression[keys[0]], document); } // Otherwise, evaluate each property const result = {}; for (const [key, value] of Object.entries(expression)) { result[key] = this.evaluate(value, document); } return result; } // Array - evaluate each element if (Array.isArray(expression)) { return expression.map(item => this.evaluate(item, document)); } // Literal value return expression; } evaluateOperator(operator, operands, document) { switch (operator) { // Arithmetic case '$add': return operands.reduce((sum, op) => sum + this.evaluate(op, document), 0); case '$subtract': const [minuend, subtrahend] = operands; return this.evaluate(minuend, document) - this.evaluate(subtrahend, document); case '$multiply': return operands.reduce((product, op) => product * this.evaluate(op, document), 1); case '$divide': const [dividend, divisor] = operands; return this.evaluate(dividend, document) / this.evaluate(divisor, document); case '$mod': const [n, m] = operands; return this.evaluate(n, document) % this.evaluate(m, document); // String case '$concat': return operands.map(op => String(this.evaluate(op, document)) ).join(''); case '$substr': const [str, start, length] = operands; return String(this.evaluate(str, document)).substr( this.evaluate(start, document), this.evaluate(length, document) ); case '$toLower': return String(this.evaluate(operands, document)).toLowerCase(); case '$toUpper': return String(this.evaluate(operands, document)).toUpperCase(); // Comparison case '$eq': const [a, b] = operands; return this.evaluate(a, document) === this.evaluate(b, document); case '$ne': return this.evaluate(operands[0], document) !== this.evaluate(operands[1], document); case '$gt': return this.evaluate(operands[0], document) > this.evaluate(operands[1], document); case '$gte': return this.evaluate(operands[0], document) >= this.evaluate(operands[1], document); case '$lt': return this.evaluate(operands[0], document) < this.evaluate(operands[1], document); case '$lte': return this.evaluate(operands[0], document) <= this.evaluate(operands[1], document); // Logical case '$and': return operands.every(op => this.evaluate(op, document)); case '$or': return operands.some(op => this.evaluate(op, document)); case '$not': return !this.evaluate(operands, document); // Conditional case '$cond': const { if: condition, then: thenBranch, else: elseBranch } = operands; return this.evaluate(condition, document) ? this.evaluate(thenBranch, document) : this.evaluate(elseBranch, document); case '$ifNull': const [expr, replacement] = operands; const val = this.evaluate(expr, document); return val === null || val === undefined ? this.evaluate(replacement, document) : val; // Array case '$size': const arr = this.evaluate(operands, document); return Array.isArray(arr) ? arr.length : 0; case '$arrayElemAt': const [array, index] = operands; const arrVal = this.evaluate(array, document); const idx = this.evaluate(index, document); return Array.isArray(arrVal) ? arrVal[idx] : undefined; // Date case '$year': case '$month': case '$dayOfMonth': case '$hour': case '$minute': case '$second': const date = new Date(this.evaluate(operands, document)); switch (operator) { case '$year': return date.getFullYear(); case '$month': return date.getMonth() + 1; case '$dayOfMonth': return date.getDate(); case '$hour': return date.getHours(); case '$minute': return date.getMinutes(); case '$second': return date.getSeconds(); } break; default: throw new Error(`Unknown operator: ${operator}`); } } getFieldValue(doc, path) { const parts = path.split('.'); let value = doc; for (const part of parts) { if (value == null) return undefined; value = value[part]; } return value; } } export default AggregationPipeline;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/smallmindsco/MongTap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server