Skip to main content
Glama
query-engine.js26.3 kB
/** * Query Engine with DataFlood Integration * * This query engine handles MongoDB queries by: * 1. Parsing query constraints * 2. Generating documents from DataFlood models * 3. Filtering generated documents based on constraints * 4. Supporting MongoDB query operators */ import { EventEmitter } from 'events'; /** * MongoDB Query Engine with DataFlood document generation */ export class QueryEngine extends EventEmitter { constructor(options = {}) { super(); this.options = { maxGeneratedDocs: options.maxGeneratedDocs || 10000, batchSize: options.batchSize || 100, timeout: options.timeout || 30000, constraintOptimization: options.constraintOptimization !== false, cacheResults: options.cacheResults !== false }; // Result cache this.cache = new Map(); this.cacheHits = 0; this.cacheMisses = 0; // Statistics this.stats = { queries: 0, totalGenerated: 0, totalFiltered: 0, averageGenerationTime: 0, averageFilterTime: 0 }; // Logging this.logger = options.logger || this.createDefaultLogger(); } createDefaultLogger() { return { debug: () => {}, info: console.error, warn: console.warn, error: console.error }; } /** * Execute a query against a collection */ async executeQuery(collection, query = {}, options = {}) { const startTime = Date.now(); this.stats.queries++; // Check cache const cacheKey = this.getCacheKey(collection.fullName, query, options); if (this.options.cacheResults && this.cache.has(cacheKey)) { this.cacheHits++; this.logger.debug(`Cache hit for query ${cacheKey}`); return this.cache.get(cacheKey); } this.cacheMisses++; try { // Extract constraints from query const constraints = this.extractConstraints(query); // Determine how many documents to generate const limit = options.limit || 100; const skip = options.skip || 0; // Generate documents based on constraints const generatedDocs = await this.generateDocuments( collection, limit + skip, constraints ); // Filter documents based on query const filteredDocs = this.filterDocuments(generatedDocs, query); // Apply skip and limit let results = filteredDocs.slice(skip, skip + limit); // Apply sort if specified if (options.sort) { results = this.sortDocuments(results, options.sort); } // Apply projection if specified if (options.projection) { results = this.projectDocuments(results, options.projection); } // Update statistics const endTime = Date.now(); this.updateStats(generatedDocs.length, filteredDocs.length, endTime - startTime); // Cache results if (this.options.cacheResults) { this.cache.set(cacheKey, results); // Limit cache size if (this.cache.size > 1000) { const firstKey = this.cache.keys().next().value; this.cache.delete(firstKey); } } this.logger.debug( `Query executed: generated=${generatedDocs.length}, ` + `filtered=${filteredDocs.length}, returned=${results.length}` ); return results; } catch (error) { this.logger.error('Query execution error:', error); throw error; } } /** * Extract constraints from MongoDB query for optimized generation */ extractConstraints(query) { const constraints = {}; for (const [field, value] of Object.entries(query)) { if (field.startsWith('$')) { // Logical operator continue; } if (typeof value === 'object' && value !== null && !Array.isArray(value)) { // Parse operators for (const [op, val] of Object.entries(value)) { switch (op) { case '$eq': constraints[field] = { equals: val }; break; case '$ne': constraints[field] = { notEquals: val }; break; case '$gt': if (!constraints[field]) constraints[field] = {}; constraints[field].min = val; constraints[field].excludeMin = true; break; case '$gte': if (!constraints[field]) constraints[field] = {}; constraints[field].min = val; break; case '$lt': if (!constraints[field]) constraints[field] = {}; constraints[field].max = val; constraints[field].excludeMax = true; break; case '$lte': if (!constraints[field]) constraints[field] = {}; constraints[field].max = val; break; case '$in': constraints[field] = { values: val }; break; case '$nin': constraints[field] = { excludeValues: val }; break; case '$regex': constraints[field] = { pattern: val }; break; case '$exists': constraints[field] = { exists: val }; break; } } } else { // Direct equality constraints[field] = { equals: value }; } } return constraints; } /** * Generate documents from DataFlood model with constraints */ async generateDocuments(collection, count, constraints) { const genStart = Date.now(); // Use collection's storage to generate documents const documents = await collection.storage.generateDocuments( collection.database, collection.name, count, { constraints } ); const genTime = Date.now() - genStart; this.logger.debug(`Generated ${documents.length} documents in ${genTime}ms`); return documents; } /** * Filter documents based on MongoDB query */ filterDocuments(documents, query) { if (!query || Object.keys(query).length === 0) { return documents; } const filterStart = Date.now(); const filtered = documents.filter(doc => this.matchesQuery(doc, query)); const filterTime = Date.now() - filterStart; this.logger.debug(`Filtered to ${filtered.length} documents in ${filterTime}ms`); return filtered; } /** * Check if a document matches a query */ matchesQuery(doc, query) { for (const [field, condition] of Object.entries(query)) { // Handle logical operators if (field === '$and') { if (!condition.every(subQuery => this.matchesQuery(doc, subQuery))) { return false; } continue; } if (field === '$or') { if (!condition.some(subQuery => this.matchesQuery(doc, subQuery))) { return false; } continue; } if (field === '$nor') { if (condition.some(subQuery => this.matchesQuery(doc, subQuery))) { return false; } continue; } if (field === '$not') { if (this.matchesQuery(doc, condition)) { return false; } continue; } // Handle field conditions const fieldValue = this.getFieldValue(doc, field); if (!this.matchesCondition(fieldValue, condition)) { return false; } } return true; } /** * Get nested field value from document */ getFieldValue(doc, path) { const parts = path.split('.'); let value = doc; for (const part of parts) { if (value == null) { return undefined; } value = value[part]; } return value; } /** * Check if a value matches a condition */ matchesCondition(value, condition) { // Direct equality if (condition === null || condition === undefined) { return value == condition; } if (typeof condition !== 'object' || Array.isArray(condition)) { return this.equals(value, condition); } // Handle operators for (const [op, expected] of Object.entries(condition)) { switch (op) { case '$eq': if (!this.equals(value, expected)) return false; break; case '$ne': if (this.equals(value, expected)) return false; break; case '$gt': if (value <= expected) return false; break; case '$gte': if (value < expected) return false; break; case '$lt': if (value >= expected) return false; break; case '$lte': if (value > expected) return false; break; case '$in': if (!expected.some(v => this.equals(value, v))) return false; break; case '$nin': if (expected.some(v => this.equals(value, v))) return false; break; case '$exists': if ((value !== undefined) !== expected) return false; break; case '$type': if (!this.matchesType(value, expected)) return false; break; case '$regex': if (!this.matchesRegex(value, expected, condition.$options)) return false; break; case '$size': if (!Array.isArray(value) || value.length !== expected) return false; break; case '$all': if (!Array.isArray(value) || !expected.every(e => value.includes(e))) return false; break; case '$elemMatch': if (!Array.isArray(value) || !value.some(v => this.matchesQuery({item: v}, {item: expected}))) return false; break; } } return true; } /** * Compare values for equality */ equals(a, b) { if (a === b) return true; if (a == null || b == null) return false; // Handle dates if (a instanceof Date && b instanceof Date) { return a.getTime() === b.getTime(); } // Handle arrays if (Array.isArray(a) && Array.isArray(b)) { if (a.length !== b.length) return false; return a.every((v, i) => this.equals(v, b[i])); } // Handle objects if (typeof a === 'object' && typeof b === 'object') { const keysA = Object.keys(a); const keysB = Object.keys(b); if (keysA.length !== keysB.length) return false; return keysA.every(key => this.equals(a[key], b[key])); } return false; } /** * Check if value matches type */ matchesType(value, type) { const typeMap = { 'double': 'number', 'string': 'string', 'object': 'object', 'array': Array.isArray, 'bool': 'boolean', 'null': v => v === null, 'int': v => Number.isInteger(v), 'date': v => v instanceof Date }; if (typeof type === 'string') { const checker = typeMap[type.toLowerCase()]; if (typeof checker === 'function') { return checker(value); } return typeof value === checker; } return false; } /** * Check if value matches regex */ matchesRegex(value, pattern, options = '') { if (typeof value !== 'string') return false; try { const regex = new RegExp(pattern, options); return regex.test(value); } catch (e) { return false; } } /** * Sort documents */ sortDocuments(documents, sortSpec) { return documents.sort((a, b) => { for (const [field, direction] of Object.entries(sortSpec)) { const aVal = this.getFieldValue(a, field); const bVal = this.getFieldValue(b, field); if (aVal < bVal) return direction === 1 ? -1 : 1; if (aVal > bVal) return direction === 1 ? 1 : -1; } return 0; }); } /** * Project documents */ projectDocuments(documents, projection) { return documents.map(doc => this.projectDocument(doc, projection)); } /** * Project a single document */ projectDocument(doc, projection) { const result = {}; const includeMode = Object.values(projection).some(v => v === 1); if (includeMode) { // Include only specified fields for (const [field, include] of Object.entries(projection)) { if (include === 1) { const value = this.getFieldValue(doc, field); if (value !== undefined) { this.setFieldValue(result, field, value); } } } // Always include _id unless explicitly excluded if (projection._id !== 0 && doc._id !== undefined) { result._id = doc._id; } } else { // Exclude specified fields Object.assign(result, doc); for (const [field, exclude] of Object.entries(projection)) { if (exclude === 0) { this.deleteFieldValue(result, field); } } } return result; } /** * Set nested field value */ setFieldValue(obj, path, value) { const parts = path.split('.'); const last = parts.pop(); let current = obj; for (const part of parts) { if (!current[part]) { current[part] = {}; } current = current[part]; } current[last] = value; } /** * Delete nested field value */ deleteFieldValue(obj, path) { const parts = path.split('.'); const last = parts.pop(); let current = obj; for (const part of parts) { if (!current[part]) return; current = current[part]; } delete current[last]; } /** * Generate cache key */ getCacheKey(collection, query, options) { return JSON.stringify({ collection, query, options }); } /** * Update statistics */ updateStats(generated, filtered, time) { this.stats.totalGenerated += generated; this.stats.totalFiltered += filtered; const prevAvgGen = this.stats.averageGenerationTime; const prevAvgFilter = this.stats.averageFilterTime; const n = this.stats.queries; this.stats.averageGenerationTime = (prevAvgGen * (n - 1) + time) / n; this.stats.averageFilterTime = (prevAvgFilter * (n - 1) + time) / n; } /** * Clear cache */ clearCache() { this.cache.clear(); this.cacheHits = 0; this.cacheMisses = 0; } /** * Get statistics */ getStats() { return { ...this.stats, cacheHits: this.cacheHits, cacheMisses: this.cacheMisses, cacheHitRate: this.cacheHits / (this.cacheHits + this.cacheMisses) || 0, cacheSize: this.cache.size }; } /** * Execute aggregation pipeline */ async executeAggregation(collection, pipeline, options = {}) { // Check if this is just a stats query (for Compass UI) const isStatsQuery = pipeline.some(stage => stage.$collStats || stage.$count || stage.$indexStats ); // For stats queries, use a smaller sample; for data queries use more const limit = isStatsQuery ? 100 : 10000; // Use collection's find method instead of executeQuery to avoid duplicate generation let documents = await collection.find({}, { limit }); for (const stage of pipeline) { const [stageName, stageConfig] = Object.entries(stage)[0]; switch (stageName) { case '$match': documents = this.filterDocuments(documents, stageConfig); break; case '$project': documents = this.projectDocuments(documents, stageConfig); break; case '$sort': documents = this.sortDocuments(documents, stageConfig); break; case '$limit': documents = documents.slice(0, stageConfig); break; case '$skip': documents = documents.slice(stageConfig); break; case '$group': documents = this.groupDocuments(documents, stageConfig); break; case '$unwind': documents = this.unwindDocuments(documents, stageConfig); break; case '$collStats': // Return collection statistics for MongoDB Compass // Use a reasonable count for display const docCount = 100; // Fixed count for UI display documents = [{ ns: collection.fullName, localTime: new Date(), latencyStats: { reads: { latency: 0, ops: 0 }, writes: { latency: 0, ops: 0 } }, storageStats: { size: docCount * 1000, count: docCount, avgObjSize: 1000, storageSize: docCount * 1200, totalIndexSize: 8192, totalSize: docCount * 1200 + 8192 }, count: docCount }]; break; case '$count': // Return document count const countField = typeof stageConfig === 'string' ? stageConfig : 'count'; documents = [{ [countField]: documents.length }]; break; default: this.logger.warn(`Unsupported aggregation stage: ${stageName}`); } } return documents; } /** * Group documents for aggregation */ groupDocuments(documents, groupSpec) { const groups = new Map(); for (const doc of documents) { // Calculate group key const key = this.calculateGroupKey(doc, groupSpec._id); if (!groups.has(key)) { groups.set(key, []); } groups.get(key).push(doc); } // Build result documents const results = []; for (const [key, groupDocs] of groups) { const result = { _id: key }; // Calculate aggregations for (const [field, spec] of Object.entries(groupSpec)) { if (field === '_id') continue; const [op, expression] = Object.entries(spec)[0]; switch (op) { case '$sum': result[field] = this.calculateSum(groupDocs, expression); break; case '$avg': result[field] = this.calculateAvg(groupDocs, expression); break; case '$min': result[field] = this.calculateMin(groupDocs, expression); break; case '$max': result[field] = this.calculateMax(groupDocs, expression); break; case '$count': result[field] = groupDocs.length; break; case '$first': result[field] = this.getFieldValue(groupDocs[0], expression.substring(1)); break; case '$last': result[field] = this.getFieldValue(groupDocs[groupDocs.length - 1], expression.substring(1)); break; } } results.push(result); } return results; } /** * Calculate group key */ calculateGroupKey(doc, keySpec) { if (keySpec === null) return null; if (typeof keySpec === 'string' && keySpec.startsWith('$')) { return this.getFieldValue(doc, keySpec.substring(1)); } if (typeof keySpec === 'object') { const key = {}; for (const [field, expr] of Object.entries(keySpec)) { if (typeof expr === 'string' && expr.startsWith('$')) { key[field] = this.getFieldValue(doc, expr.substring(1)); } else { key[field] = expr; } } return JSON.stringify(key); } return keySpec; } /** * Calculate sum for aggregation */ calculateSum(docs, expression) { if (expression === 1) return docs.length; const field = expression.substring(1); return docs.reduce((sum, doc) => { const val = this.getFieldValue(doc, field); return sum + (typeof val === 'number' ? val : 0); }, 0); } /** * Calculate average for aggregation */ calculateAvg(docs, expression) { const sum = this.calculateSum(docs, expression); return docs.length > 0 ? sum / docs.length : 0; } /** * Calculate min for aggregation */ calculateMin(docs, expression) { const field = expression.substring(1); let min = Infinity; for (const doc of docs) { const val = this.getFieldValue(doc, field); if (typeof val === 'number' && val < min) { min = val; } } return min === Infinity ? null : min; } /** * Calculate max for aggregation */ calculateMax(docs, expression) { const field = expression.substring(1); let max = -Infinity; for (const doc of docs) { const val = this.getFieldValue(doc, field); if (typeof val === 'number' && val > max) { max = val; } } return max === -Infinity ? null : max; } /** * Unwind documents for aggregation */ unwindDocuments(documents, unwindSpec) { const results = []; const field = typeof unwindSpec === 'string' ? unwindSpec : unwindSpec.path; const fieldName = field.substring(1); // Remove $ for (const doc of documents) { const array = this.getFieldValue(doc, fieldName); if (!Array.isArray(array) || array.length === 0) { if (unwindSpec.preserveNullAndEmptyArrays) { results.push(doc); } continue; } for (const item of array) { const newDoc = { ...doc }; this.setFieldValue(newDoc, fieldName, item); results.push(newDoc); } } return results; } } export default QueryEngine;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/smallmindsco/MongTap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server