Skip to main content
Glama
kuzu-transaction-manager.ts7.47 kB
import { BaseKuzuClient } from '../base/base-kuzu-client'; import { KuzuConnectionManager } from './kuzu-connection-manager'; import { logError } from '../../utils/logger'; /** * Service responsible for managing database transactions * Handles transaction lifecycle, rollback, and nested transaction support */ export class KuzuTransactionManager extends BaseKuzuClient { private connectionManager: KuzuConnectionManager; private activeTransactions = new Set<string>(); constructor(clientProjectRoot: string, connectionManager: KuzuConnectionManager) { super(clientProjectRoot); this.connectionManager = connectionManager; } /** * Execute a series of queries within a transaction */ async transaction<T>( transactionBlock: (tx: { executeQuery: (query: string, params?: Record<string, any>) => Promise<any>; }) => Promise<T>, ): Promise<T> { if (!this.connectionManager.isConnected()) { await this.connectionManager.initialize(); } const transactionId = `tx-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; const logger = this.createOperationLogger('transaction', { transactionId }); logger.debug('Beginning transaction'); try { const connection = this.connectionManager.getConnection(); await connection.query('BEGIN TRANSACTION'); this.activeTransactions.add(transactionId); const txContext = { executeQuery: async (query: string, params?: Record<string, any>): Promise<any> => { logger.debug({ query, params }, 'Executing query in transaction'); // When parameters are provided, use prepared statements to avoid the // "progressCallback must be a function" error that occurs when // passing a params object directly to connection.query(). if (params && Object.keys(params).length > 0) { const prepared = await connection.prepare(query); return connection.execute(prepared, params); } // No params: run the query directly. return connection.query(query); }, }; const result = await transactionBlock(txContext); await connection.query('COMMIT'); this.activeTransactions.delete(transactionId); logger.debug('Transaction committed successfully'); return result; } catch (error) { logger.error({ error }, 'Transaction failed, rolling back'); try { const connection = this.connectionManager.getConnection(); await connection.query('ROLLBACK'); this.activeTransactions.delete(transactionId); logger.debug('Transaction rolled back successfully'); } catch (rollbackError) { logger.error({ rollbackError }, 'Failed to rollback transaction'); this.activeTransactions.delete(transactionId); } throw error; } } /** * Execute a transaction with automatic retry on failure */ async transactionWithRetry<T>( transactionBlock: (tx: { executeQuery: (query: string, params?: Record<string, any>) => Promise<any>; }) => Promise<T>, maxRetries: number = 3, retryDelay: number = 1000, ): Promise<T> { const logger = this.createOperationLogger('transaction-with-retry', { maxRetries }); let lastError: Error | null = null; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { logger.debug({ attempt }, 'Attempting transaction'); return await this.transaction(transactionBlock); } catch (error) { lastError = error as Error; logger.warn({ attempt, error }, 'Transaction attempt failed'); if (attempt < maxRetries) { logger.debug({ retryDelay }, 'Waiting before retry'); await new Promise((resolve) => setTimeout(resolve, retryDelay)); } } } logger.error({ maxRetries }, 'All transaction attempts failed'); throw lastError || new Error('Transaction failed after all retry attempts'); } /** * Execute multiple transactions in sequence */ async sequentialTransactions<T>( transactionBlocks: Array< (tx: { executeQuery: (query: string, params?: Record<string, any>) => Promise<any>; }) => Promise<T> >, ): Promise<T[]> { const logger = this.createOperationLogger('sequential-transactions', { transactionCount: transactionBlocks.length, }); const results: T[] = []; try { for (let i = 0; i < transactionBlocks.length; i++) { logger.debug({ transactionIndex: i }, 'Executing sequential transaction'); const result = await this.transaction(transactionBlocks[i]); results.push(result); } logger.debug('All sequential transactions completed successfully'); return results; } catch (error) { logger.error({ completedTransactions: results.length }, 'Sequential transaction failed'); throw error; } } /** * Check if there are any active transactions */ hasActiveTransactions(): boolean { return this.activeTransactions.size > 0; } /** * Get the number of active transactions */ getActiveTransactionCount(): number { return this.activeTransactions.size; } /** * Force rollback all active transactions (emergency cleanup) */ async forceRollbackAll(): Promise<void> { const logger = this.createOperationLogger('force-rollback-all', { activeTransactionCount: this.activeTransactions.size, }); if (this.activeTransactions.size === 0) { logger.debug('No active transactions to rollback'); return; } logger.warn('Force rolling back all active transactions'); try { const connection = this.connectionManager.getConnection(); await connection.query('ROLLBACK'); this.activeTransactions.clear(); logger.info('All active transactions rolled back'); } catch (error) { logError(logger, error as Error, { operation: 'force-rollback' }); // Clear the set anyway since we can't be sure of the state this.activeTransactions.clear(); throw error; } } /** * Execute a read-only transaction (for queries that don't modify data) */ async readOnlyTransaction<T>( queryBlock: (tx: { executeQuery: (query: string, params?: Record<string, any>) => Promise<any>; }) => Promise<T>, ): Promise<T> { const logger = this.createOperationLogger('read-only-transaction'); // For read-only operations, we can execute without explicit transaction boundaries // but still provide the transaction-like interface if (!this.connectionManager.isConnected()) { await this.connectionManager.initialize(); } const connection = this.connectionManager.getConnection(); const txContext = { executeQuery: async (query: string, params?: Record<string, any>): Promise<any> => { logger.debug({ query, params }, 'Executing read-only query'); if (params && Object.keys(params).length > 0) { const prepared = await connection.prepare(query); return connection.execute(prepared, params); } return connection.query(query); }, }; try { const result = await queryBlock(txContext); logger.debug('Read-only transaction completed successfully'); return result; } catch (error) { logError(logger, error as Error, { operation: 'read-only-transaction' }); throw error; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/KuzuMem-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server