MCP GitHub Issue Server
by sammcj
/**
* Manages atomic transactions for task operations
*/
import { Logger } from '../../../logging/index.js';
import { TaskStorage } from '../../../types/storage.js';
import { ErrorCodes, createError } from '../../../errors/index.js';
import {
Transaction,
Operation,
TransactionResult,
TransactionOptions,
DEFAULT_TRANSACTION_OPTIONS,
} from '../../../types/transaction.js';
export class TransactionManager {
private readonly logger: Logger;
private activeTransactions: Map<string, Transaction>;
private transactionCounter: number;
private transactionTimeouts: Map<string, NodeJS.Timeout>;
private static instance: TransactionManager | null = null;
private constructor(private readonly storage?: TaskStorage) {
this.logger = Logger.getInstance().child({ component: 'TaskTransactionManager' });
this.activeTransactions = new Map();
this.transactionTimeouts = new Map();
this.transactionCounter = 0;
// Periodic cleanup of stale transactions
setInterval(() => this.cleanupStaleTransactions(), 60000); // Every minute
}
static getInstance(storage?: TaskStorage): TransactionManager {
if (!TransactionManager.instance) {
TransactionManager.instance = new TransactionManager(storage);
}
return TransactionManager.instance;
}
/**
* Reset the singleton instance (useful for cleanup)
*/
static resetInstance(): void {
if (TransactionManager.instance) {
TransactionManager.instance.cleanup();
TransactionManager.instance = null;
}
}
/**
* Cleanup resources
*/
private cleanup(): void {
// Rollback all active transactions
for (const [id, transaction] of this.activeTransactions.entries()) {
try {
this.rollback(transaction).catch(error =>
this.logger.error('Failed to rollback transaction during cleanup', {
error,
transactionId: id,
})
);
} catch (error) {
this.logger.error('Error during transaction cleanup', {
error,
transactionId: id,
});
}
}
// Clear all timeouts
for (const timeout of this.transactionTimeouts.values()) {
clearTimeout(timeout);
}
this.activeTransactions.clear();
this.transactionTimeouts.clear();
this.transactionCounter = 0;
}
/**
* Cleanup stale transactions
*/
private async cleanupStaleTransactions(): Promise<void> {
const now = Date.now();
const staleTimeout = 30 * 60 * 1000; // 30 minutes
for (const [id, transaction] of this.activeTransactions.entries()) {
if (now - transaction.timestamp > staleTimeout) {
this.logger.warn('Found stale transaction', {
transactionId: id,
age: now - transaction.timestamp,
});
try {
await this.rollback(transaction);
} catch (error) {
this.logger.error('Failed to rollback stale transaction', {
error,
transactionId: id,
});
}
}
}
}
/**
* Begins a new transaction
*/
async begin(options: TransactionOptions = {}): Promise<Transaction> {
const mergedOptions = { ...DEFAULT_TRANSACTION_OPTIONS, ...options };
const id = this.generateTransactionId();
const transaction: Transaction = {
id,
operations: [],
timestamp: Date.now(),
status: 'pending',
timeout: mergedOptions.timeout,
metadata: {
retryCount: 0,
},
};
// Set up transaction timeout
if (mergedOptions.timeout) {
const timeoutHandle = setTimeout(() => {
this.handleTransactionTimeout(id).catch(error => {
this.logger.error('Failed to handle transaction timeout', {
error,
transactionId: id,
});
});
}, mergedOptions.timeout);
this.transactionTimeouts.set(id, timeoutHandle);
}
try {
// Acquire lock if required
if (mergedOptions.requireLock) {
await this.acquireLock(id);
}
// Start storage-level transaction if storage supports it
if (this.storage && 'beginTransaction' in this.storage) {
await this.storage.beginTransaction();
}
this.activeTransactions.set(id, transaction);
this.logger.debug('Transaction started', {
transactionId: id,
timestamp: transaction.timestamp,
options: mergedOptions,
});
return transaction;
} catch (error) {
this.logger.error('Failed to begin transaction', {
error,
transactionId: id,
});
throw createError(
ErrorCodes.TRANSACTION_ERROR,
'Failed to begin transaction',
'TransactionManager.begin',
undefined,
{ error: String(error) }
);
}
}
/**
* Commits a transaction
*/
async commit(transaction: Transaction, retryOnError: boolean = true): Promise<TransactionResult> {
try {
const startTime = Date.now();
// Validate transaction state
this.validateTransactionState(transaction);
// Clear timeout if exists
this.clearTransactionTimeout(transaction.id);
try {
// Persist and commit transaction
if (this.storage && 'commitTransaction' in this.storage) {
await this.persistTransaction(transaction);
await this.storage.commitTransaction();
}
transaction.status = 'committed';
this.activeTransactions.delete(transaction.id);
const duration = Date.now() - startTime;
this.logger.debug('Transaction committed', {
transactionId: transaction.id,
operationCount: transaction.operations.length,
duration,
});
return {
success: true,
transactionId: transaction.id,
metadata: {
duration,
retryCount: transaction.metadata?.retryCount,
},
};
} catch (error) {
// Retry logic for transient errors
if (
retryOnError &&
transaction.metadata?.retryCount! < DEFAULT_TRANSACTION_OPTIONS.retryLimit! &&
this.isRetryableError(error)
) {
transaction.metadata!.retryCount!++;
await new Promise(resolve => setTimeout(resolve, DEFAULT_TRANSACTION_OPTIONS.retryDelay));
return this.commit(transaction, true);
}
throw error;
}
} catch (error) {
this.logger.error('Failed to commit transaction', {
error,
transactionId: transaction.id,
});
await this.rollback(transaction);
return {
success: false,
transactionId: transaction.id,
error: error instanceof Error ? error : new Error(String(error)),
};
}
}
/**
* Rolls back a transaction
*/
async rollback(transaction: Transaction): Promise<TransactionResult> {
try {
if (!this.activeTransactions.has(transaction.id)) {
throw createError(
ErrorCodes.INVALID_STATE,
`Transaction ${transaction.id} not found`,
'TransactionManager.rollback'
);
}
// Rollback storage-level transaction first
if (this.storage && 'rollbackTransaction' in this.storage) {
await this.storage.rollbackTransaction();
}
// Then reverse operations in reverse order
for (const operation of [...transaction.operations].reverse()) {
await this.rollbackOperation(operation);
}
transaction.status = 'rolled_back';
this.activeTransactions.delete(transaction.id);
this.logger.debug('Transaction rolled back', {
transactionId: transaction.id,
operationCount: transaction.operations.length,
});
return {
success: true,
transactionId: transaction.id,
};
} catch (error) {
this.logger.error('Failed to rollback transaction', {
error,
transactionId: transaction.id,
});
// Even if application-level rollback fails, ensure storage transaction is rolled back
if (this.storage && 'rollbackTransaction' in this.storage) {
try {
await this.storage.rollbackTransaction();
} catch (rollbackError) {
this.logger.error('Failed to rollback storage transaction', {
error: rollbackError,
transactionId: transaction.id,
});
}
}
return {
success: false,
transactionId: transaction.id,
error: error instanceof Error ? error : new Error(String(error)),
};
}
}
/**
* Gets active transaction by ID
*/
getTransaction(id: string): Transaction | undefined {
return this.activeTransactions.get(id);
}
/**
* Generates a unique transaction ID
*/
private generateTransactionId(): string {
this.transactionCounter++;
return `txn_${Date.now()}_${this.transactionCounter}_${Math.random().toString(36).substr(2, 9)}`;
}
private async handleTransactionTimeout(transactionId: string): Promise<void> {
const transaction = this.activeTransactions.get(transactionId);
if (!transaction) return;
this.logger.warn('Transaction timeout', {
transactionId,
duration: Date.now() - transaction.timestamp,
});
try {
await this.rollback(transaction);
} catch (error) {
this.logger.error('Failed to rollback timed out transaction', {
error,
transactionId,
});
// Force cleanup even if rollback fails
this.activeTransactions.delete(transactionId);
this.clearTransactionTimeout(transactionId);
if (this.storage && 'rollbackTransaction' in this.storage) {
try {
await this.storage.rollbackTransaction();
} catch (e) {
this.logger.error('Failed to rollback storage after timeout', {
error: e,
transactionId,
});
}
}
} finally {
this.clearTransactionTimeout(transactionId);
}
}
private clearTransactionTimeout(transactionId: string): void {
const timeout = this.transactionTimeouts.get(transactionId);
if (timeout) {
clearTimeout(timeout);
this.transactionTimeouts.delete(transactionId);
}
// Also clean up transaction from active map if it exists
if (this.activeTransactions.has(transactionId)) {
this.activeTransactions.delete(transactionId);
}
}
private validateTransactionState(transaction: Transaction): void {
if (!this.activeTransactions.has(transaction.id)) {
throw createError(
ErrorCodes.INVALID_STATE,
`Transaction ${transaction.id} not found`,
'TransactionManager.validateTransactionState'
);
}
if (transaction.status !== 'pending') {
throw createError(
ErrorCodes.INVALID_STATE,
`Transaction ${transaction.id} is already ${transaction.status}`,
'TransactionManager.validateTransactionState'
);
}
}
private async acquireLock(transactionId: string): Promise<void> {
// Implement distributed locking mechanism here
// Could use Redis, ZooKeeper, or other lock service
this.logger.debug('Lock acquired', { transactionId });
}
private isRetryableError(error: any): boolean {
// Add logic to determine if error is transient
return (
error.code === 'SQLITE_BUSY' ||
error.code === 'SQLITE_LOCKED' ||
error.message.includes('deadlock')
);
}
/**
* Persists a transaction to storage
*/
private async persistTransaction(transaction: Transaction): Promise<void> {
if (!this.storage) return;
try {
// Implementation depends on storage interface
// Could store in a transactions table or log
this.logger.debug('Transaction persisted', {
transactionId: transaction.id,
});
} catch (error) {
this.logger.error('Failed to persist transaction', {
error,
transactionId: transaction.id,
});
throw error;
}
}
/**
* Rolls back a single operation
*/
private async rollbackOperation(operation: Operation): Promise<void> {
if (!this.storage) return;
try {
switch (operation.type) {
case 'delete':
// Restore deleted tasks
if (operation.tasks && operation.tasks.length > 0) {
await this.storage.saveTasks(operation.tasks);
}
break;
case 'update':
// Revert task to previous state
if (operation.previousState && operation.path) {
await this.storage.updateTask(operation.path, operation.previousState);
}
break;
case 'create':
// Delete created task
if (operation.task) {
await this.storage.deleteTasks([operation.task.path]);
}
break;
}
} catch (error) {
this.logger.error('Failed to rollback operation', {
error,
operationType: operation.type,
});
throw error;
}
}
}