Skip to main content
Glama
mongodb-server.js35.5 kB
/** * MongoDB Wire Protocol Server * TCP server that handles MongoDB protocol connections * Integrates with DataFlood for model-based data generation */ import net from 'net'; import { EventEmitter } from 'events'; import MessageHandler from '../protocol/message.js'; import { OpcodeParser, OpCode } from '../protocol/opcodes.js'; import DataFloodStorage from '../storage/dataflood-storage.js'; import { BSON, Long } from '../protocol/bson.js'; import { CRUDHandlers } from './crud-handlers.js'; import config from '../../config/config-loader.js'; /** * MongoDB-compatible server with DataFlood integration */ export class MongoDBServer extends EventEmitter { constructor(options = {}) { super(); // Store options for later use this.options = options; // Server configuration this.port = options.port || 27017; this.host = options.host || '127.0.0.1'; this.maxConnections = options.maxConnections || 1000; // Components this.server = null; this.connections = new Map(); this.messageHandler = new MessageHandler(options.logger); // Use provided storage or create new one this.storage = options.storage || new DataFloodStorage({ basePath: options.modelPath || config.storage.modelsBasePath, logger: options.logger, enableAutoTrain: options.enableAutoTrain !== false || config.server.enableAutoTrain, trainThreshold: options.trainThreshold || config.server.trainThreshold }); // CRUD handlers with DataFlood integration this.crudHandlers = new CRUDHandlers({ storage: this.storage, messageHandler: this.messageHandler, logger: options.logger }); // Logging this.logger = options.logger || this.createDefaultLogger(); // Statistics this.stats = { connectionsTotal: 0, connectionsActive: 0, messagesReceived: 0, messagesSent: 0, bytesReceived: 0, bytesSent: 0, startTime: null }; // Connection ID counter this.connectionIdCounter = 1; } createDefaultLogger() { return { debug: () => {}, info: () => {}, // Suppress in stdio mode warn: () => {}, // Suppress in stdio mode error: () => {} // Suppress errors - should never output to stderr }; } /** * Start the MongoDB server */ async start() { // Initialize storage only if it wasn't provided if (!this.options?.storage) { await this.storage.initialize(); } return new Promise((resolve, reject) => { this.server = net.createServer(); // Set max connections this.server.maxConnections = this.maxConnections; // Handle new connections this.server.on('connection', (socket) => { this.handleConnection(socket); }); // Handle server errors this.server.on('error', (err) => { this.logger.error('Server error:', err); this.emit('error', err); reject(err); }); // Start listening this.server.listen(this.port, this.host, () => { this.stats.startTime = new Date(); this.logger.info(`MongoDB server listening on ${this.host}:${this.port}`); this.emit('listening', { host: this.host, port: this.port }); resolve(); }); }); } /** * Stop the MongoDB server */ async stop() { return new Promise((resolve) => { // Close all connections for (const conn of this.connections.values()) { conn.socket.end(); } this.connections.clear(); // Close server if (this.server) { this.server.close(() => { this.logger.info('MongoDB server stopped'); this.emit('closed'); resolve(); }); } else { resolve(); } }); } /** * Handle a new client connection */ handleConnection(socket) { const connectionId = this.connectionIdCounter++; const remoteAddress = `${socket.remoteAddress}:${socket.remotePort}`; this.logger.info(`New connection ${connectionId} from ${remoteAddress}`); // Create connection object const connection = { id: connectionId, socket, remoteAddress, buffer: Buffer.alloc(0), database: 'test', // Default database authenticated: true, // No auth for now startTime: new Date(), stats: { messagesReceived: 0, messagesSent: 0, bytesReceived: 0, bytesSent: 0 } }; // Store connection this.connections.set(connectionId, connection); this.stats.connectionsTotal++; this.stats.connectionsActive++; // Emit connection event this.emit('connection', { id: connectionId, remoteAddress }); // Handle socket events socket.on('data', (data) => { this.handleData(connection, data); }); socket.on('error', (err) => { this.logger.error(`Connection ${connectionId} error:`, err); this.emit('connectionError', { id: connectionId, error: err }); }); socket.on('close', () => { this.logger.info(`Connection ${connectionId} closed`); this.connections.delete(connectionId); this.stats.connectionsActive--; this.emit('connectionClosed', { id: connectionId }); }); } /** * Handle incoming data from a connection */ handleData(connection, data) { // Add to buffer connection.buffer = Buffer.concat([connection.buffer, data]); connection.stats.bytesReceived += data.length; this.stats.bytesReceived += data.length; // Process complete messages while (connection.buffer.length >= 16) { // Minimum message size // Check if we have a complete message const messageLength = connection.buffer.readInt32LE(0); if (connection.buffer.length < messageLength) { // Wait for more data break; } // Extract message const messageBuffer = connection.buffer.slice(0, messageLength); connection.buffer = connection.buffer.slice(messageLength); // Process message this.processMessage(connection, messageBuffer); } } /** * Process a complete MongoDB wire protocol message */ async processMessage(connection, messageBuffer) { connection.stats.messagesReceived++; this.stats.messagesReceived++; try { // Parse message const message = OpcodeParser.parseMessage(messageBuffer); const requestId = message.header.requestID; this.logger.debug(`Processing ${message.constructor.name} from connection ${connection.id}`); // Route based on opcode let response; switch (message.header.opCode) { case OpCode.OP_QUERY: response = await this.handleQuery(connection, message); break; case OpCode.OP_INSERT: response = await this.handleInsert(connection, message); break; case OpCode.OP_UPDATE: response = await this.handleUpdate(connection, message); break; case OpCode.OP_DELETE: response = await this.handleDelete(connection, message); break; case OpCode.OP_GET_MORE: response = await this.handleGetMore(connection, message); break; case OpCode.OP_KILL_CURSORS: response = await this.handleKillCursors(connection, message); break; case OpCode.OP_MSG: response = await this.handleMsg(connection, message); break; default: this.logger.warn(`Unsupported opcode: ${message.header.opCode}`); response = this.messageHandler.createErrorReply( requestId, `Unsupported operation: ${message.header.opCode}`, 1 ); } // Send response if one was generated if (response) { this.sendResponse(connection, response); } } catch (err) { this.logger.error(`Error processing message:`, err); const errorResponse = this.messageHandler.createErrorReply( 0, err.message, 1 ); this.sendResponse(connection, errorResponse); } } /** * Handle OP_QUERY message */ async handleQuery(connection, message) { const { fullCollectionName, numberToSkip, numberToReturn } = message; // Emit query event for statistics this.emit('query', { connectionId: connection.id, collection: fullCollectionName }); // Parse query from BSON buffer if present let query = {}; let returnFieldsSelector = {}; if (message.queryBuffer && message.queryBuffer.length > 0) { this.logger.debug('Query buffer length:', message.queryBuffer.length); try { // First BSON document is the query const querySize = message.queryBuffer.readInt32LE(0); if (querySize > 0 && querySize <= message.queryBuffer.length) { const queryDoc = message.queryBuffer.slice(0, querySize); query = BSON.deserialize(queryDoc); // Second BSON document (if present) is the projection if (message.queryBuffer.length > querySize) { const projBuffer = message.queryBuffer.slice(querySize); const projSize = projBuffer.readInt32LE(0); if (projSize > 0 && projSize <= projBuffer.length) { returnFieldsSelector = BSON.deserialize(projBuffer.slice(0, projSize)); } } } } catch (err) { this.logger.error('Failed to parse query BSON:', err); } } // Use pre-parsed values only if they have actual content // Don't overwrite with empty objects from the default constructor if (message.query !== undefined && Object.keys(message.query).length > 0) { query = message.query; } if (message.returnFieldsSelector !== undefined && message.returnFieldsSelector !== null) { returnFieldsSelector = message.returnFieldsSelector; } this.logger.info(`Query on ${fullCollectionName}:`, JSON.stringify(query)); // Handle special collections const [database, collection] = fullCollectionName.split('.'); if (collection === '$cmd') { return this.handleCommand(connection, database, query, message.header.requestID); } try { // Use CRUD handlers for query const result = await this.crudHandlers.handleFind( fullCollectionName, query, { skip: numberToSkip, limit: numberToReturn, projection: returnFieldsSelector } ); // Create response return this.messageHandler.createOpReply( message.header.requestID, result.documents, result.cursorId ); } catch (error) { this.logger.error('Query error:', error); return this.messageHandler.createErrorReply( message.header.requestID, error.message, 1 ); } } /** * Handle OP_INSERT message */ async handleInsert(connection, message) { const { fullCollectionName, flags } = message; // Emit insert event for statistics this.emit('insert', { connectionId: connection.id, collection: fullCollectionName }); // Parse documents from BSON buffer if present let documents = []; if (message.documentsBuffer && message.documentsBuffer.length > 0) { try { let offset = 0; const buffer = message.documentsBuffer; while (offset < buffer.length) { const docSize = buffer.readInt32LE(offset); if (docSize <= 0 || offset + docSize > buffer.length) break; const docBuffer = buffer.slice(offset, offset + docSize); const doc = BSON.deserialize(docBuffer); documents.push(doc); offset += docSize; } } catch (err) { this.logger.error('Failed to parse insert documents:', err); } } // Fallback to pre-parsed documents if available if (message.documents && message.documents.length > 0) { documents = message.documents; } this.logger.info(`Insert ${documents.length} documents into ${fullCollectionName}`); try { // Use CRUD handlers for insert await this.crudHandlers.handleInsert(fullCollectionName, documents, flags); // No response for OP_INSERT (fire and forget) return null; } catch (error) { this.logger.error('Insert error:', error); // Still return null as OP_INSERT doesn't get responses return null; } } /** * Handle OP_UPDATE message */ async handleUpdate(connection, message) { const { fullCollectionName, selector, update, flags } = message; // Emit update event for statistics this.emit('update', { connectionId: connection.id, collection: fullCollectionName }); this.logger.info(`Update in ${fullCollectionName}:`, { selector, update }); try { // Use CRUD handlers for update await this.crudHandlers.handleUpdate(fullCollectionName, selector, update, flags); // No response for OP_UPDATE (fire and forget) return null; } catch (error) { this.logger.error('Update error:', error); return null; } } /** * Handle OP_DELETE message */ async handleDelete(connection, message) { const { fullCollectionName, selector, flags } = message; // Emit delete event for statistics this.emit('delete', { connectionId: connection.id, collection: fullCollectionName }); this.logger.info(`Delete from ${fullCollectionName}:`, selector); try { // Use CRUD handlers for delete await this.crudHandlers.handleDelete(fullCollectionName, selector, flags); // No response for OP_DELETE (fire and forget) return null; } catch (error) { this.logger.error('Delete error:', error); return null; } } /** * Handle OP_GET_MORE message */ async handleGetMore(connection, message) { const { fullCollectionName, cursorID, numberToReturn } = message; this.logger.info(`Get more from cursor ${cursorID} in ${fullCollectionName}`); // No cursor support yet, return empty return this.messageHandler.createOpReply( message.header.requestID, [], 0n ); } /** * Handle OP_KILL_CURSORS message */ async handleKillCursors(connection, message) { const { cursorIDs } = message; this.logger.info(`Kill cursors:`, cursorIDs); // No response for OP_KILL_CURSORS return null; } /** * Handle OP_MSG message (modern protocol) */ async handleMsg(connection, message) { // Extract command from first section if (!message.sections || message.sections.length === 0) { return this.messageHandler.createErrorReply( message.header.requestID, 'Invalid OP_MSG: no sections', 1 ); } const command = message.sections[0].document; if (!command) { return this.messageHandler.createErrorReply( message.header.requestID, 'Invalid OP_MSG: no command document', 1 ); } this.logger.info('OP_MSG command:', command); // Route based on command if (command.insert) { return this.handleInsertCommand(connection, command, message.header.requestID); } else if (command.find) { return this.handleFindCommand(connection, command, message.header.requestID); } else if (command.update) { return this.handleUpdateCommand(connection, command, message.header.requestID); } else if (command.delete) { return this.handleDeleteCommand(connection, command, message.header.requestID); } else if (command.aggregate) { return this.handleAggregateCommand(connection, command, message.header.requestID); } else if (command.count) { return this.handleCountCommand(connection, command, message.header.requestID); } else if (command.createIndexes) { return this.handleCreateIndexCommand(connection, command, message.header.requestID); } else if (command.listIndexes) { return this.handleListIndexesCommand(connection, command, message.header.requestID); } else { return this.handleCommand(connection, command.$db || connection.database, command, message.header.requestID); } } /** * Handle database commands */ async handleCommand(connection, database, command, requestId) { this.logger.info(`Command on ${database}:`, command); // Handle isMaster/hello command if (command.isMaster || command.ismaster || command.hello) { return this.messageHandler.createOpMsg({ ok: 1, ismaster: true, maxBsonObjectSize: 16777216, maxMessageSizeBytes: 48000000, maxWriteBatchSize: 100000, localTime: new Date(), minWireVersion: 0, maxWireVersion: 13, readOnly: false, // Session support logicalSessionTimeoutMinutes: 30, connectionId: Math.floor(Math.random() * 1000000), // Topology information msg: 'isdbgrid', topologyVersion: { processId: { $oid: '507f1f77bcf86cd799439011' }, counter: 6 }, // Additional capabilities compression: ['snappy', 'zlib'], saslSupportedMechs: [] }, 0, requestId); } // Handle ping if (command.ping) { return this.messageHandler.createOpMsg({ ok: 1 }, 0, requestId); } // Handle startSession if (command.startSession) { const sessionId = { id: { $binary: { base64: Buffer.from(Math.random().toString()).toString('base64'), subType: '04' } } }; return this.messageHandler.createOpMsg({ ok: 1, ...sessionId }, 0, requestId); } // Handle endSessions if (command.endSessions) { return this.messageHandler.createOpMsg({ ok: 1 }, 0, requestId); } // Handle refreshSessions if (command.refreshSessions) { return this.messageHandler.createOpMsg({ ok: 1 }, 0, requestId); } // Handle connectionStatus if (command.connectionStatus) { const response = { authInfo: { authenticatedUsers: [], authenticatedUserRoles: [] }, ok: 1 }; // Add privileges if requested if (command.showPrivileges) { response.authInfo.authenticatedUserPrivileges = []; } return this.messageHandler.createOpMsg(response, 0, requestId); } // Handle getParameter if (command.getParameter) { const response = { ok: 1 }; if (command.featureCompatibilityVersion) { response.featureCompatibilityVersion = { version: "7.0", targetVersion: "7.0" }; } return this.messageHandler.createOpMsg(response, 0, requestId); } // Handle atlasVersion (MongoDB Atlas specific) if (command.atlasVersion) { // Not Atlas, return command not found return this.messageHandler.createOpMsg({ ok: 0, errmsg: "Command not found: atlasVersion", code: 59 }, 0, requestId); } // Handle buildInfo if (command.buildInfo) { return this.messageHandler.createOpMsg({ version: "7.0.0", gitVersion: "mongtap", modules: [], allocator: "system", javascriptEngine: "none", sysInfo: "MongTap/DataFlood", versionArray: [7, 0, 0, 0], ok: 1 }, 0, requestId); } // Handle hostInfo if (command.hostInfo) { return this.messageHandler.createOpMsg({ system: { currentTime: new Date(), hostname: "localhost", cpuAddrSize: 64, memSizeMB: 8192, numCores: 4, cpuArch: "arm64" }, ok: 1 }, 0, requestId); } // Handle listDatabases if (command.listDatabases) { const databases = await this.storage.listDatabases(); return this.messageHandler.createOpMsg({ databases: databases.map(name => ({ name, sizeOnDisk: 0, empty: false })), totalSize: 0, ok: 1 }, 0, requestId); } // Handle listCollections if (command.listCollections) { let collections = await this.storage.listCollections(database); // For configured default database, also show available models as collections if (database === config.storage.defaultDatabase) { // Get available models from the default database directory const modelCollections = await this.storage.listCollections(config.storage.defaultDatabase); // Merge with any existing collections const allCollections = new Set([...collections, ...modelCollections]); collections = Array.from(allCollections); } // Create properly formatted cursor response const cursorDoc = { id: new Long(0, 0), // Use BSON Long type for cursor ID ns: `${database}.$cmd.listCollections`, firstBatch: collections.map(name => ({ name, type: 'collection', options: {}, info: { readOnly: false, uuid: null }, idIndex: { v: 2, key: { _id: 1 }, name: '_id_' } })) }; return this.messageHandler.createOpMsg({ cursor: cursorDoc, ok: 1 }, 0, requestId); } // Handle getLastError if (command.getLastError || command.getlasterror) { return this.messageHandler.createOpMsg({ ok: 1, err: null, n: 0 }, 0, requestId); } // Unknown command return this.messageHandler.createOpMsg({ ok: 0, errmsg: `Unknown command: ${Object.keys(command).join(', ')}`, code: 59 }, 0, requestId); } /** * Handle insert command (OP_MSG style) */ async handleInsertCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleInsertCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Insert command error:', error); return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle find command (OP_MSG style) */ async handleFindCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleFindCommand(command); // Use createOpMsg for OP_MSG responses, not createOpReply return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Find command error:', error); // Error responses still use createOpMsg return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle update command (OP_MSG style) */ async handleUpdateCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleUpdateCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Update command error:', error); return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle delete command (OP_MSG style) */ async handleDeleteCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleDeleteCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Delete command error:', error); return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle aggregate command (OP_MSG style) */ async handleAggregateCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleAggregateCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Aggregate command error:', error); return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle count command (OP_MSG style) */ async handleCountCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleCountCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('Count command error:', error); return this.messageHandler.createOpMsg({ ok: 0, errmsg: error.message, code: error.code || 1 }, 0, requestId); } } /** * Handle createIndexes command (OP_MSG style) */ async handleCreateIndexCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleCreateIndexCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('CreateIndex command error:', error); return this.messageHandler.createErrorReply(requestId, error.message, 1); } } /** * Handle listIndexes command (OP_MSG style) */ async handleListIndexesCommand(connection, command, requestId) { try { const result = await this.crudHandlers.handleListIndexesCommand(command); return this.messageHandler.createOpMsg(result, 0, requestId); } catch (error) { this.logger.error('ListIndexes command error:', error); return this.messageHandler.createErrorReply(requestId, error.message, 1); } } /** * Extract constraints from MongoDB query for generation */ extractConstraints(query) { const constraints = {}; if (!query || typeof query !== 'object') { return constraints; } // Extract simple equality constraints for (const [field, value] of Object.entries(query)) { if (!field.startsWith('$') && !isOperatorObject(value)) { constraints[field] = value; } } return constraints; function isOperatorObject(value) { return value && typeof value === 'object' && Object.keys(value).some(k => k.startsWith('$')); } } /** * Filter documents based on MongoDB query * Basic implementation - can be enhanced */ filterDocuments(documents, query) { if (!query || Object.keys(query).length === 0) { return documents; } return documents.filter(doc => this.matchesQuery(doc, query)); } /** * Check if a document matches a query */ matchesQuery(doc, query) { for (const [field, condition] of Object.entries(query)) { const value = this.getFieldValue(doc, field); // Handle operators if (typeof condition === 'object' && condition !== null) { if (!this.matchesOperators(value, condition)) { return false; } } else { // Simple equality if (value !== condition) { return false; } } } return true; } /** * Get nested field value from document */ getFieldValue(doc, path) { const parts = path.split('.'); let current = doc; for (const part of parts) { if (current == null) return undefined; current = current[part]; } return current; } /** * Match value against operator conditions */ matchesOperators(value, operators) { for (const [op, operand] of Object.entries(operators)) { switch (op) { case '$eq': if (value !== operand) return false; break; case '$ne': if (value === operand) return false; break; case '$gt': if (!(value > operand)) return false; break; case '$gte': if (!(value >= operand)) return false; break; case '$lt': if (!(value < operand)) return false; break; case '$lte': if (!(value <= operand)) return false; break; case '$in': if (!operand.includes(value)) return false; break; case '$nin': if (operand.includes(value)) return false; break; case '$exists': if ((value !== undefined) !== operand) return false; break; default: // Unknown operator, ignore for now break; } } return true; } /** * Send response to client */ sendResponse(connection, responseBuffer) { connection.socket.write(responseBuffer); connection.stats.messagesSent++; connection.stats.bytesSent += responseBuffer.length; this.stats.messagesSent++; this.stats.bytesSent += responseBuffer.length; this.logger.debug(`Sent ${responseBuffer.length} bytes to connection ${connection.id}`); } /** * Get server statistics */ getStats() { return { ...this.stats, uptime: this.stats.startTime ? Date.now() - this.stats.startTime.getTime() : 0, connections: Array.from(this.connections.values()).map(conn => ({ id: conn.id, remoteAddress: conn.remoteAddress, database: conn.database, uptime: Date.now() - conn.startTime.getTime(), stats: conn.stats })) }; } } // Export as default export default MongoDBServer;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/smallmindsco/MongTap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server