Skip to main content
Glama

Prisma MCP Server

Official
by prisma
Apache 2.0
4
44,192
  • Linux
  • Apple
transaction-manager.ts12 kB
import { Debug } from '@prisma/debug' import { SqlDriverAdapter, SqlQuery, SqlQueryable, Transaction } from '@prisma/driver-adapter-utils' import { randomUUID } from '../crypto' import { QueryEvent } from '../events' import type { SchemaProvider } from '../schema' import { TracingHelper, withQuerySpanAndEvent } from '../tracing' import { rethrowAsUserFacing } from '../user-facing-error' import { assertNever } from '../utils' import { Options, TransactionInfo } from './transaction' import { InvalidTransactionIsolationLevelError, TransactionClosedError, TransactionExecutionTimeoutError, TransactionInternalConsistencyError, TransactionManagerError, TransactionNotFoundError, TransactionRolledBackError, TransactionStartTimeoutError, } from './transaction-manager-error' const MAX_CLOSED_TRANSACTIONS = 100 type TransactionWrapper = { id: string timer?: NodeJS.Timeout timeout: number | undefined startedAt: number transaction?: Transaction } & TransactionState type TransactionState = | { status: 'waiting' | 'running' | 'committed' | 'rolled_back' | 'timed_out' } | { status: 'closing'; closing: Promise<void>; reason: 'committed' | 'rolled_back' | 'timed_out' } const debug = Debug('prisma:client:transactionManager') const COMMIT_QUERY = (): SqlQuery => ({ sql: 'COMMIT', args: [], argTypes: [] }) const ROLLBACK_QUERY = (): SqlQuery => ({ sql: 'ROLLBACK', args: [], argTypes: [] }) const PHANTOM_COMMIT_QUERY = (): SqlQuery => ({ sql: '-- Implicit "COMMIT" query via underlying driver', args: [], argTypes: [], }) const PHANTOM_ROLLBACK_QUERY = (): SqlQuery => ({ sql: '-- Implicit "ROLLBACK" query via underlying driver', args: [], argTypes: [], }) export class TransactionManager { // The map of active transactions. private transactions: Map<string, TransactionWrapper> = new Map() // List of last closed transactions. Max MAX_CLOSED_TRANSACTIONS entries. // Used to provide better error messages than a generic "transaction not found". private closedTransactions: TransactionWrapper[] = [] private readonly driverAdapter: SqlDriverAdapter private readonly transactionOptions: Options private readonly tracingHelper: TracingHelper readonly #onQuery?: (event: QueryEvent) => void readonly #provider?: SchemaProvider constructor({ driverAdapter, transactionOptions, tracingHelper, onQuery, provider, }: { driverAdapter: SqlDriverAdapter transactionOptions: Options tracingHelper: TracingHelper onQuery?: (event: QueryEvent) => void provider?: SchemaProvider }) { this.driverAdapter = driverAdapter this.transactionOptions = transactionOptions this.tracingHelper = tracingHelper this.#onQuery = onQuery this.#provider = provider } /** * Starts an internal transaction. The difference to `startTransaction` is that it does not * use the default transaction options from the `TransactionManager`, which in practice means * that it does not apply the default timeouts. */ async startInternalTransaction(options?: Options): Promise<TransactionInfo> { const validatedOptions = options !== undefined ? this.#validateOptions(options) : {} return await this.tracingHelper.runInChildSpan('start_transaction', () => this.#startTransactionImpl(validatedOptions), ) } async startTransaction(options?: Options): Promise<TransactionInfo> { const validatedOptions = options !== undefined ? this.#validateOptions(options) : this.transactionOptions return await this.tracingHelper.runInChildSpan('start_transaction', () => this.#startTransactionImpl(validatedOptions), ) } async #startTransactionImpl(options: Options): Promise<TransactionInfo> { const transaction: TransactionWrapper = { id: await randomUUID(), status: 'waiting', timer: undefined, timeout: options.timeout, startedAt: Date.now(), transaction: undefined, } this.transactions.set(transaction.id, transaction) // Start timeout to wait for transaction to be started. let hasTimedOut = false const startTimer = createTimeoutIfDefined(() => (hasTimedOut = true), options.maxWait) startTimer?.unref?.() transaction.transaction = await this.driverAdapter .startTransaction(options.isolationLevel) .catch(rethrowAsUserFacing) clearTimeout(startTimer) // Transaction status might have timed out while waiting for transaction to start. => Check for it! switch (transaction.status) { case 'waiting': if (hasTimedOut) { await this.#closeTransaction(transaction, 'timed_out') throw new TransactionStartTimeoutError() } transaction.status = 'running' // Start timeout to wait for transaction to be finished. transaction.timer = this.#startTransactionTimeout(transaction.id, options.timeout) return { id: transaction.id } case 'timed_out': case 'running': case 'committed': case 'rolled_back': throw new TransactionInternalConsistencyError( `Transaction in invalid state ${transaction.status} although it just finished startup.`, ) default: assertNever(transaction['status'], 'Unknown transaction status.') } } async commitTransaction(transactionId: string): Promise<void> { return await this.tracingHelper.runInChildSpan('commit_transaction', async () => { const txw = this.#getActiveOrClosingTransaction(transactionId, 'commit') await this.#closeTransaction(txw, 'committed') }) } async rollbackTransaction(transactionId: string): Promise<void> { return await this.tracingHelper.runInChildSpan('rollback_transaction', async () => { const txw = this.#getActiveOrClosingTransaction(transactionId, 'rollback') await this.#closeTransaction(txw, 'rolled_back') }) } async getTransaction(txInfo: TransactionInfo, operation: string): Promise<Transaction> { let tx = this.#getActiveOrClosingTransaction(txInfo.id, operation) if (tx.status === 'closing') { await tx.closing // Fetch again to ensure proper error propagation after it's been closed. tx = this.#getActiveOrClosingTransaction(txInfo.id, operation) } if (!tx.transaction) throw new TransactionNotFoundError() return tx.transaction } #getActiveOrClosingTransaction(transactionId: string, operation: string): TransactionWrapper { const transaction = this.transactions.get(transactionId) if (!transaction) { const closedTransaction = this.closedTransactions.find((tx) => tx.id === transactionId) if (closedTransaction) { debug('Transaction already closed.', { transactionId, status: closedTransaction.status }) switch (closedTransaction.status) { case 'closing': case 'waiting': case 'running': throw new TransactionInternalConsistencyError('Active transaction found in closed transactions list.') case 'committed': throw new TransactionClosedError(operation) case 'rolled_back': throw new TransactionRolledBackError(operation) case 'timed_out': throw new TransactionExecutionTimeoutError(operation, { timeout: closedTransaction.timeout!, timeTaken: Date.now() - closedTransaction.startedAt, }) } } else { debug(`Transaction not found.`, transactionId) throw new TransactionNotFoundError() } } if (['committed', 'rolled_back', 'timed_out'].includes(transaction.status)) { throw new TransactionInternalConsistencyError('Closed transaction found in active transactions map.') } return transaction } async cancelAllTransactions(): Promise<void> { // TODO: call `map` on the iterator directly without collecting it into an array first // once we drop support for Node.js 18 and 20. await Promise.allSettled([...this.transactions.values()].map((tx) => this.#closeTransaction(tx, 'rolled_back'))) } #startTransactionTimeout(transactionId: string, timeout: number | undefined): NodeJS.Timeout | undefined { const timeoutStartedAt = Date.now() const timer = createTimeoutIfDefined(async () => { debug('Transaction timed out.', { transactionId, timeoutStartedAt, timeout }) const tx = this.transactions.get(transactionId) if (tx && ['running', 'waiting'].includes(tx.status)) { await this.#closeTransaction(tx, 'timed_out') } else { // Transaction was already committed or rolled back when timeout happened. // Should normally not happen as timeout is cancelled when transaction is committed or rolled back. // No further action needed though. debug('Transaction already committed or rolled back when timeout happened.', transactionId) } }, timeout) timer?.unref?.() return timer } async #closeTransaction(tx: TransactionWrapper, status: 'committed' | 'rolled_back' | 'timed_out'): Promise<void> { const createClosingPromise = async () => { debug('Closing transaction.', { transactionId: tx.id, status }) try { if (tx.transaction && status === 'committed') { if (tx.transaction.options.usePhantomQuery) { await this.#withQuerySpanAndEvent(PHANTOM_COMMIT_QUERY(), tx.transaction, () => tx.transaction!.commit()) } else { const query = COMMIT_QUERY() await this.#withQuerySpanAndEvent(query, tx.transaction, () => tx.transaction!.executeRaw(query)) await tx.transaction.commit() } } else if (tx.transaction) { if (tx.transaction.options.usePhantomQuery) { await this.#withQuerySpanAndEvent(PHANTOM_ROLLBACK_QUERY(), tx.transaction, () => tx.transaction!.rollback(), ) } else { const query = ROLLBACK_QUERY() await this.#withQuerySpanAndEvent(query, tx.transaction, () => tx.transaction!.executeRaw(query)) await tx.transaction.rollback() } } } finally { tx.status = status clearTimeout(tx.timer) tx.timer = undefined this.transactions.delete(tx.id) this.closedTransactions.push(tx) if (this.closedTransactions.length > MAX_CLOSED_TRANSACTIONS) { this.closedTransactions.shift() } } } if (tx.status === 'closing') { await tx.closing // Fetch again to ensure proper error propagation after it's been closed. this.#getActiveOrClosingTransaction(tx.id, status === 'committed' ? 'commit' : 'rollback') } else { await Object.assign(tx, { status: 'closing', reason: status, closing: createClosingPromise(), }).closing } } #validateOptions(options: Options) { // Supplying timeout default values is cared for upstream already. if (!options.timeout) throw new TransactionManagerError('timeout is required') if (!options.maxWait) throw new TransactionManagerError('maxWait is required') // Snapshot level only supported for MS SQL Server, which is not supported via driver adapters so far. if (options.isolationLevel === 'SNAPSHOT') throw new InvalidTransactionIsolationLevelError(options.isolationLevel) return { ...options, timeout: options.timeout, maxWait: options.maxWait, } } #withQuerySpanAndEvent<T>(query: SqlQuery, queryable: SqlQueryable, execute: () => Promise<T>): Promise<T> { return withQuerySpanAndEvent({ query, execute, provider: this.#provider ?? queryable.provider, tracingHelper: this.tracingHelper, onQuery: this.#onQuery, }) } } function createTimeoutIfDefined(cb: () => void, ms: number | undefined): NodeJS.Timeout | undefined { return ms !== undefined ? setTimeout(cb, ms) : undefined }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prisma/prisma'

If you have feedback or need assistance with the MCP directory API, please join our Discord server