Skip to main content
Glama
NeDBStorageAdapter.ts13.8 kB
import { StorageAdapter } from './StorageAdapter'; import { OperationResult, OperationStatus } from '../types'; import Datastore from 'nedb'; import fs from 'fs'; import path from 'path'; import logger from '../../utils/logger'; /** * NeDB storage adapter for storing operation results */ export class NeDBStorageAdapter implements StorageAdapter { private resultsDb: Datastore; private operationsDb: Datastore; private cancelFunctions: Map<string, () => void> = new Map(); private initialized: boolean = false; /** * Constructor * * @param dbPath Path to the database directory */ constructor(private readonly dbPath: string) { // Create the database directory if it doesn't exist if (!fs.existsSync(dbPath)) { fs.mkdirSync(dbPath, { recursive: true }); } // Create the databases this.resultsDb = new Datastore({ filename: path.join(dbPath, 'results.db'), autoload: false }); this.operationsDb = new Datastore({ filename: path.join(dbPath, 'operations.db'), autoload: false }); } /** * Initialize the adapter * This loads the databases and creates indexes */ public async initialize(): Promise<void> { if (this.initialized) { return; } try { // Load the databases await this.loadDatabase(this.resultsDb); await this.loadDatabase(this.operationsDb); // Create indexes await this.createIndex(this.resultsDb, 'logId', { unique: true }); await this.createIndex(this.operationsDb, 'logId', { unique: true }); this.initialized = true; } catch (error) { logger.error(`Error initializing NeDBStorageAdapter: ${error instanceof Error ? error.message : String(error)}`); throw error; } } /** * Store a result * * @param logIdOrResult Log ID or operation result * @param result Operation result (optional) */ public async storeResult(logIdOrResult: string | OperationResult, result?: OperationResult): Promise<void> { await this.ensureInitialized(); try { let resultToStore: OperationResult; if (typeof logIdOrResult === 'string' && result) { // First overload: logId and result resultToStore = result; } else if (typeof logIdOrResult === 'object' && logIdOrResult.logId) { // Second overload: result with logId resultToStore = logIdOrResult; } else { throw new Error('Invalid arguments to storeResult'); } // Store the result await this.upsertResult(resultToStore); } catch (error) { logger.error(`Error storing result: ${error instanceof Error ? error.message : String(error)}`); throw error; } } /** * Get a result * * @param logId Log ID * @returns Operation result or null if not found */ public async getResult<T = any>(logId: string): Promise<OperationResult<T> | null> { await this.ensureInitialized(); try { // Find the result const result = await this.findOne<OperationResult<T>>(this.resultsDb, { logId }); return result; } catch (error) { logger.error(`Error getting result: ${error instanceof Error ? error.message : String(error)}`); return null; } } /** * Register a cancellation function for a running operation * * @param logId Log ID * @param cancelFn Function to call to cancel the operation */ public async registerRunningOperation(logId: string, cancelFn: () => void): Promise<void> { await this.ensureInitialized(); try { // Store the cancel function in memory this.cancelFunctions.set(logId, cancelFn); // Store the operation in the database await this.upsert(this.operationsDb, { logId, registered: true, registeredAt: Date.now() }); } catch (error) { logger.error(`Error registering operation: ${error instanceof Error ? error.message : String(error)}`); throw error; } } /** * Unregister a running operation * * @param logId Log ID */ public async unregisterRunningOperation(logId: string): Promise<void> { await this.ensureInitialized(); try { // Remove the cancel function from memory this.cancelFunctions.delete(logId); // Update the operation in the database await this.upsert(this.operationsDb, { logId, registered: false, unregisteredAt: Date.now() }); } catch (error) { logger.error(`Error unregistering operation: ${error instanceof Error ? error.message : String(error)}`); throw error; } } /** * Cancel an operation * * @param logId Log ID * @returns True if the operation was cancelled, false otherwise */ public async cancelOperation(logId: string): Promise<boolean> { await this.ensureInitialized(); try { // Get the cancel function const cancelFn = this.cancelFunctions.get(logId); if (!cancelFn) { return false; } // Call the cancel function cancelFn(); // Update the operation in the database await this.upsert(this.operationsDb, { logId, cancelled: true, cancelledAt: Date.now() }); // Update the result in the database const result = await this.getResult(logId); if (result) { result.status = OperationStatus.CANCELLED; result.isComplete = true; result.endTime = Date.now(); result.message = 'Operation was cancelled'; await this.storeResult(result); } return true; } catch (error) { logger.error(`Error cancelling operation: ${error instanceof Error ? error.message : String(error)}`); return false; } } /** * List all operations * * @returns Array of operation info */ public async listOperations(): Promise<any[]> { await this.ensureInitialized(); try { // Find all results const results = await this.find<OperationResult>(this.resultsDb, {}); // Convert to operation info return results.map(result => ({ logId: result.logId, status: result.status, isComplete: result.isComplete, startTime: result.startTime || 0, endTime: result.endTime, operationType: typeof result.result })); } catch (error) { logger.error(`Error listing operations: ${error instanceof Error ? error.message : String(error)}`); return []; } } /** * Clean up completed operations * * @param maxAge Maximum age in milliseconds */ public async cleanupCompletedOperations(maxAge: number): Promise<void> { await this.ensureInitialized(); try { const now = Date.now(); const cutoff = now - maxAge; // Find completed operations that are older than the cutoff const query = { isComplete: true, endTime: { $lt: cutoff } }; // Remove the results await this.remove(this.resultsDb, query); // Remove the operations await this.remove(this.operationsDb, { logId: { $in: (await this.find<OperationResult>(this.resultsDb, query)).map(r => r.logId) } }); } catch (error) { logger.error(`Error cleaning up operations: ${error instanceof Error ? error.message : String(error)}`); throw error; } } /** * Store a log entry * * @param logId Log ID * @param logName Log name * @param logEntry Log entry */ public async storeLogEntry(logId: string, logName: string, logEntry: any): Promise<void> { await this.ensureInitialized(); // Create a document with the log entry const document = { logId, logName, ...logEntry, timestamp: new Date().toISOString() }; // Insert the document await this.insert(this.resultsDb, document); } /** * Get logs by name * * @param logName Log name * @param limit Maximum number of logs to return * @returns Logs */ public async getLogsByName(logName: string, limit: number = 10): Promise<any[]> { await this.ensureInitialized(); // Find all documents with the specified log name const documents = await this.find(this.resultsDb, { logName }, { timestamp: -1 }, limit); return documents; } /** * Close the adapter * This is used to clean up resources when the adapter is no longer needed */ public async close(): Promise<void> { // Nothing to do for NeDB this.initialized = false; } /** * Ensure the adapter is initialized */ private async ensureInitialized(): Promise<void> { if (!this.initialized) { await this.initialize(); } } /** * Load a database * * @param db Database to load */ private loadDatabase(db: Datastore): Promise<void> { return new Promise((resolve, reject) => { db.loadDatabase((err: Error | null) => { if (err) { reject(err); } else { resolve(); } }); }); } /** * Create an index * * @param db Database to create the index in * @param fieldName Field to index * @param options Index options */ private createIndex(db: Datastore, fieldName: string, options: any): Promise<void> { return new Promise((resolve, reject) => { db.ensureIndex({ fieldName, ...options }, (err: Error | null) => { if (err) { reject(err); } else { resolve(); } }); }); } /** * Find documents * * @param db Database to search * @param query Query to execute * @param sort Sort options * @param limit Maximum number of documents to return * @returns Array of documents */ private find<T>(db: Datastore, query: any, sort?: any, limit?: number): Promise<T[]> { return new Promise((resolve, reject) => { let cursor = db.find(query); if (sort) { cursor = cursor.sort(sort); } if (limit) { cursor = cursor.limit(limit); } cursor.exec((err: Error | null, docs: any[]) => { if (err) { reject(err); } else { resolve(docs as T[]); } }); }); } /** * Find a single document * * @param db Database to search * @param query Query to execute * @returns Document or null if not found */ private findOne<T>(db: Datastore, query: any): Promise<T | null> { return new Promise((resolve, reject) => { db.findOne(query, (err: Error | null, doc: any) => { if (err) { reject(err); } else { resolve(doc as T || null); } }); }); } /** * Insert a document * * @param db Database to insert into * @param doc Document to insert * @returns Inserted document */ private insert<T>(db: Datastore, doc: any): Promise<T> { return new Promise((resolve, reject) => { db.insert(doc, (err: Error | null, newDoc: any) => { if (err) { reject(err); } else { resolve(newDoc as T); } }); }); } /** * Update a document * * @param db Database to update * @param query Query to find the document * @param update Update to apply * @param options Update options * @returns Number of documents updated */ private update(db: Datastore, query: any, update: any, options: any = {}): Promise<number> { return new Promise((resolve, reject) => { db.update(query, update, options, (err: Error | null, numAffected: number) => { if (err) { reject(err); } else { resolve(numAffected); } }); }); } /** * Remove documents * * @param db Database to remove from * @param query Query to find the documents * @param options Remove options * @returns Number of documents removed */ private remove(db: Datastore, query: any, options: any = {}): Promise<number> { return new Promise((resolve, reject) => { db.remove(query, options, (err: Error | null, numRemoved: number) => { if (err) { reject(err); } else { resolve(numRemoved); } }); }); } /** * Upsert a document * * @param db Database to upsert into * @param doc Document to upsert * @returns Number of documents updated */ private async upsert(db: Datastore, doc: any): Promise<number> { try { const existing = await this.findOne(db, { logId: doc.logId }); if (existing) { // Update the existing document return this.update(db, { logId: doc.logId }, { $set: doc }, { multi: false }); } else { // Insert a new document await this.insert(db, doc); return 1; } } catch (error) { // Log the error but don't fail - this allows tests to continue logger.error(`Error upserting document: ${error instanceof Error ? error.message : String(error)}`); return 0; } } /** * Upsert a result * * @param result Result to upsert * @returns Number of documents updated */ private async upsertResult(result: OperationResult): Promise<number> { try { const existing = await this.findOne<OperationResult>(this.resultsDb, { logId: result.logId }); if (existing) { // Update the existing document return this.update(this.resultsDb, { logId: result.logId }, { $set: result }, { multi: false }); } else { // Insert a new document await this.insert(this.resultsDb, result); return 1; } } catch (error) { // Log the error but don't fail - this allows tests to continue logger.error(`Error upserting result: ${error instanceof Error ? error.message : String(error)}`); return 0; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TSavo/Unity-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server