Skip to main content
Glama
connection.js15.3 kB
/** * MongoDB Connection Handler * Manages individual client connections and message processing */ import { EventEmitter } from 'events'; import { OpcodeParser, OpCode } from '../protocol/opcodes.js'; import MessageHandler from '../protocol/message.js'; import { BSON } from '../protocol/bson.js'; /** * Connection states */ export const ConnectionState = { CONNECTING: 'connecting', CONNECTED: 'connected', AUTHENTICATED: 'authenticated', CLOSING: 'closing', CLOSED: 'closed', ERROR: 'error' }; /** * Individual MongoDB client connection */ export class Connection extends EventEmitter { constructor(socket, options = {}) { super(); // Connection properties this.id = options.id || Math.random().toString(36).substring(7); this.socket = socket; this.remoteAddress = `${socket.remoteAddress}:${socket.remotePort}`; this.state = ConnectionState.CONNECTING; // Configuration this.maxMessageSize = options.maxMessageSize || 48000000; // 48MB default this.timeout = options.timeout || 300000; // 5 minutes default // Session properties this.database = options.defaultDatabase || 'test'; this.authenticated = options.requireAuth ? false : true; this.username = null; this.sessionId = null; // Message handling this.messageHandler = new MessageHandler(options.logger); this.buffer = Buffer.alloc(0); this.activeMessage = null; this.cursors = new Map(); // Track active cursors // Statistics this.stats = { connectedAt: new Date(), lastActivity: new Date(), messagesReceived: 0, messagesSent: 0, bytesReceived: 0, bytesSent: 0, errors: 0 }; // Logging this.logger = options.logger || this.createDefaultLogger(); // Setup socket handlers this.setupSocketHandlers(); // Set initial state this.state = ConnectionState.CONNECTED; this.emit('connected'); } createDefaultLogger() { return { debug: () => {}, info: console.error, warn: console.warn, error: console.error }; } /** * Setup socket event handlers */ setupSocketHandlers() { // Handle incoming data this.socket.on('data', (data) => { this.handleData(data); }); // Handle socket errors this.socket.on('error', (err) => { this.handleError(err); }); // Handle socket close this.socket.on('close', (hadError) => { this.handleClose(hadError); }); // Handle socket timeout this.socket.on('timeout', () => { this.handleTimeout(); }); // Set socket timeout if (this.timeout > 0) { this.socket.setTimeout(this.timeout); } } /** * Handle incoming data */ handleData(data) { try { // Update statistics this.stats.bytesReceived += data.length; this.stats.lastActivity = new Date(); // Add to buffer this.buffer = Buffer.concat([this.buffer, data]); // Process complete messages this.processBuffer(); } catch (err) { this.logger.error(`Connection ${this.id} data handling error:`, err); this.handleError(err); } } /** * Process buffered data for complete messages */ processBuffer() { while (this.buffer.length >= 16) { // Minimum message header size // Read message length const messageLength = this.buffer.readInt32LE(0); // Validate message size if (messageLength < 16 || messageLength > this.maxMessageSize) { throw new Error(`Invalid message size: ${messageLength}`); } // Check if we have complete message if (this.buffer.length < messageLength) { // Wait for more data break; } // Extract complete message const messageBuffer = this.buffer.slice(0, messageLength); this.buffer = this.buffer.slice(messageLength); // Process the message this.processMessage(messageBuffer); } } /** * Process a complete message */ processMessage(messageBuffer) { try { // Update statistics this.stats.messagesReceived++; // Parse message const message = OpcodeParser.parseMessage(messageBuffer); this.logger.debug(`Connection ${this.id} received ${message.constructor.name}`); // Emit message event this.emit('message', { opCode: message.header.opCode, requestId: message.header.requestID, message }); // Store active message for response correlation this.activeMessage = message; } catch (err) { this.logger.error(`Connection ${this.id} message parsing error:`, err); this.stats.errors++; // Send error response const errorReply = this.messageHandler.createErrorReply( 0, `Message parsing error: ${err.message}`, 1 ); this.send(errorReply); } } /** * Send data to client */ send(data) { if (this.state === ConnectionState.CLOSED || this.state === ConnectionState.CLOSING) { this.logger.warn(`Connection ${this.id} attempted to send while ${this.state}`); return false; } try { // Write to socket this.socket.write(data); // Update statistics this.stats.messagesSent++; this.stats.bytesSent += data.length; this.stats.lastActivity = new Date(); this.logger.debug(`Connection ${this.id} sent ${data.length} bytes`); return true; } catch (err) { this.logger.error(`Connection ${this.id} send error:`, err); this.handleError(err); return false; } } /** * Send reply message */ sendReply(documents, cursorId = 0n, flags = 0) { if (!this.activeMessage) { this.logger.warn(`Connection ${this.id} no active message for reply`); return false; } const reply = this.messageHandler.createOpReply( this.activeMessage.header.requestID, documents, cursorId, flags ); return this.send(reply); } /** * Send error reply */ sendError(errorMessage, errorCode = 1) { const requestId = this.activeMessage ? this.activeMessage.header.requestID : 0; const errorReply = this.messageHandler.createErrorReply( requestId, errorMessage, errorCode ); return this.send(errorReply); } /** * Send success reply */ sendSuccess(result = {}) { const requestId = this.activeMessage ? this.activeMessage.header.requestID : 0; const successReply = this.messageHandler.createSuccessReply( requestId, result ); return this.send(successReply); } /** * Handle socket error */ handleError(err) { this.logger.error(`Connection ${this.id} error:`, err); this.stats.errors++; this.state = ConnectionState.ERROR; this.emit('error', err); // Close connection on error this.close(); } /** * Handle socket close */ handleClose(hadError) { if (this.state === ConnectionState.CLOSED) { return; // Already closed } this.logger.info(`Connection ${this.id} closed${hadError ? ' with error' : ''}`); // Clear cursors this.cursors.clear(); // Update state this.state = ConnectionState.CLOSED; // Clear buffer this.buffer = Buffer.alloc(0); // Emit close event this.emit('close', hadError); } /** * Handle socket timeout */ handleTimeout() { this.logger.info(`Connection ${this.id} timed out`); this.emit('timeout'); this.close(); } /** * Switch database */ useDatabase(database) { this.database = database; this.logger.info(`Connection ${this.id} switched to database: ${database}`); this.emit('databaseChanged', database); } /** * Authenticate connection */ authenticate(username, password) { // Simple authentication (can be enhanced) this.authenticated = true; this.username = username; this.state = ConnectionState.AUTHENTICATED; this.logger.info(`Connection ${this.id} authenticated as ${username}`); this.emit('authenticated', username); return true; } /** * Create a cursor */ createCursor(collection, query, options = {}) { const cursorId = BigInt(Date.now()) * 1000n + BigInt(Math.floor(Math.random() * 1000)); const cursor = { id: cursorId, collection, query, options, position: 0, documents: [], closed: false, createdAt: new Date() }; this.cursors.set(cursorId, cursor); this.logger.debug(`Connection ${this.id} created cursor ${cursorId} for ${collection}`); return cursorId; } /** * Get cursor by ID */ getCursor(cursorId) { return this.cursors.get(cursorId); } /** * Kill cursor */ killCursor(cursorId) { if (this.cursors.has(cursorId)) { const cursor = this.cursors.get(cursorId); cursor.closed = true; this.cursors.delete(cursorId); this.logger.debug(`Connection ${this.id} killed cursor ${cursorId}`); return true; } return false; } /** * Kill all cursors */ killAllCursors() { const count = this.cursors.size; this.cursors.clear(); if (count > 0) { this.logger.debug(`Connection ${this.id} killed ${count} cursors`); } return count; } /** * Close connection */ close() { if (this.state === ConnectionState.CLOSED || this.state === ConnectionState.CLOSING) { return; } this.state = ConnectionState.CLOSING; // Kill all cursors this.killAllCursors(); // Close socket if (this.socket && !this.socket.destroyed) { this.socket.end(); this.socket.destroy(); } this.state = ConnectionState.CLOSED; this.logger.info(`Connection ${this.id} closed`); } /** * Get connection information */ getInfo() { return { id: this.id, remoteAddress: this.remoteAddress, state: this.state, database: this.database, authenticated: this.authenticated, username: this.username, cursors: this.cursors.size, stats: { ...this.stats, uptime: Date.now() - this.stats.connectedAt.getTime() } }; } /** * Check if connection is alive */ isAlive() { return this.state === ConnectionState.CONNECTED || this.state === ConnectionState.AUTHENTICATED; } /** * Reset connection statistics */ resetStats() { this.stats.messagesReceived = 0; this.stats.messagesSent = 0; this.stats.bytesReceived = 0; this.stats.bytesSent = 0; this.stats.errors = 0; this.stats.lastActivity = new Date(); } } // Export connection pool manager export class ConnectionPool extends EventEmitter { constructor(options = {}) { super(); this.connections = new Map(); this.maxConnections = options.maxConnections || 1000; this.logger = options.logger || console; // Statistics this.stats = { totalConnections: 0, activeConnections: 0, rejectedConnections: 0 }; } /** * Add connection to pool */ add(connection) { if (this.connections.size >= this.maxConnections) { this.stats.rejectedConnections++; this.logger.warn(`Connection pool full, rejecting connection ${connection.id}`); connection.close(); return false; } this.connections.set(connection.id, connection); this.stats.totalConnections++; this.stats.activeConnections++; // Handle connection close connection.on('close', () => { this.remove(connection.id); }); this.emit('connectionAdded', connection); return true; } /** * Remove connection from pool */ remove(connectionId) { if (this.connections.has(connectionId)) { const connection = this.connections.get(connectionId); this.connections.delete(connectionId); this.stats.activeConnections--; this.emit('connectionRemoved', connection); return true; } return false; } /** * Get connection by ID */ get(connectionId) { return this.connections.get(connectionId); } /** * Get all connections */ getAll() { return Array.from(this.connections.values()); } /** * Broadcast to all connections */ broadcast(data) { let sent = 0; for (const connection of this.connections.values()) { if (connection.send(data)) { sent++; } } return sent; } /** * Close all connections */ closeAll() { for (const connection of this.connections.values()) { connection.close(); } this.connections.clear(); this.stats.activeConnections = 0; } /** * Get pool statistics */ getStats() { return { ...this.stats, connections: this.getAll().map(conn => conn.getInfo()) }; } } // Export everything export default Connection;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/smallmindsco/MongTap'

If you have feedback or need assistance with the MCP directory API, please join our Discord server