Skip to main content
Glama
ooples

MCP Console Automation Server

SFTPProtocol.ts.original38.4 kB
import { EventEmitter } from 'events'; import { Client as SSHClient } from 'ssh2'; import SFTPClient from 'ssh2-sftp-client'; import { promises as fs, createReadStream, createWriteStream, Stats } from 'fs'; import { join, dirname, basename, resolve } from 'path'; import { pipeline } from 'stream/promises'; import { createHash } from 'crypto'; import { Transform, Readable, Writable } from 'stream'; import PQueue from 'p-queue'; import { v4 as uuidv4 } from 'uuid'; // Type alias for ssh2-sftp-client FileInfoType type FileInfoType = "d" | "-" | "l"; import { SFTPTransferOptions, SFTPFileInfo, SFTPDirectoryListing, SFTPTransferProgress, SFTPBatchTransfer, SFTPSessionOptions, SFTPDirectoryOperations, SFTPFileOperations, SFTPSyncOptions, SFTPSyncResult, SFTPConnectionState, SCPTransferOptions, FileTransferSession } from '../types/index.js'; import { Logger } from '../utils/logger.js'; import { RetryManager } from '../core/RetryManager.js'; import { ErrorRecovery } from '../core/ErrorRecovery.js'; /** * Production-ready SFTP/SCP Protocol Implementation * * Features: * - Full SFTP subsystem support on top of SSH * - Concurrent file transfers with intelligent queuing * - Transfer resume capability for interrupted transfers * - Progress tracking with real-time bandwidth monitoring * - Batch operations with priority handling * - Directory synchronization with conflict resolution * - Comprehensive error handling and recovery * - Bandwidth throttling and adaptive compression * - File integrity verification with checksums * - Connection pooling and keep-alive management */ export class SFTPProtocol extends EventEmitter { private sshClient: SSHClient | null = null; private sftpClient: SFTPClient | null = null; private sessionId: string; private options: SFTPSessionOptions; private connectionState: SFTPConnectionState; private transferQueue: PQueue; private batchQueue: PQueue; private activeTransfers: Map<string, SFTPTransferProgress>; private activeBatches: Map<string, SFTPBatchTransfer>; private resumeData: Map<string, any>; private bandwidthController: BandwidthController; private logger: Logger; private retryManager: RetryManager; private errorRecovery: ErrorRecovery; private keepAliveTimer: NodeJS.Timeout | null = null; private healthCheckTimer: NodeJS.Timeout | null = null; private isConnected: boolean = false; private isConnecting: boolean = false; // Directory and file operations public readonly directories: SFTPDirectoryOperations; public readonly files: SFTPFileOperations; constructor(sessionId: string, options: SFTPSessionOptions) { super(); this.sessionId = sessionId; this.options = { maxConcurrentTransfers: 3, transferQueue: { maxSize: 100, priorityLevels: 4, timeoutMs: 300000 }, bandwidth: { adaptiveThrottling: true }, compressionLevel: 6, keepAlive: { enabled: true, interval: 30000, maxMissed: 3 }, ...options }; this.logger = new Logger(`SFTPProtocol-${sessionId}`); this.retryManager = new RetryManager(); this.errorRecovery = new ErrorRecovery(); // Initialize queues with concurrency limits this.transferQueue = new PQueue({ concurrency: this.options.maxConcurrentTransfers || 3, timeout: this.options.transferQueue?.timeoutMs || 300000, throwOnTimeout: true }); this.batchQueue = new PQueue({ concurrency: 2, timeout: 600000 }); // Initialize state tracking this.activeTransfers = new Map(); this.activeBatches = new Map(); this.resumeData = new Map(); // Initialize bandwidth controller this.bandwidthController = new BandwidthController(this.options.bandwidth || {}); // Initialize connection state this.connectionState = { sessionId, isConnected: false, connectionTime: new Date(), lastActivity: new Date(), serverInfo: { version: '', extensions: [], limits: { maxPacketLength: 32768, maxReadLength: 32768, maxWriteLength: 32768 } }, activeTransfers: 0, queuedTransfers: 0, transferStats: { totalUploads: 0, totalDownloads: 0, totalBytes: 0, averageSpeed: 0 } }; // Initialize operation interfaces this.directories = this.createDirectoryOperations(); this.files = this.createFileOperations(); this.setupErrorHandling(); this.setupEventHandlers(); } /** * Establish SFTP connection over SSH */ async connect(): Promise<void> { if (this.isConnected) { return; } if (this.isConnecting) { return new Promise((resolve, reject) => { this.once('connected', resolve); this.once('error', reject); }); } this.isConnecting = true; try { await this.retryManager.executeWithRetry( () => this.establishConnection(), { sessionId: this.sessionId, operationName: 'sftp_connect', strategyName: 'sftp', context: { host: this.options.host, port: this.options.port } } ); this.isConnected = true; this.isConnecting = false; this.connectionState.isConnected = true; this.connectionState.connectionTime = new Date(); this.startKeepAlive(); this.startHealthCheck(); this.emit('connected', this.connectionState); this.logger.info('SFTP connection established successfully'); } catch (error) { this.isConnecting = false; this.logger.error('Failed to establish SFTP connection:', error); this.emit('error', error); throw error; } } /** * Close SFTP connection and cleanup resources */ async disconnect(): Promise<void> { this.logger.info('Disconnecting SFTP session'); // Stop keep-alive and health checks if (this.keepAliveTimer) { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } if (this.healthCheckTimer) { clearInterval(this.healthCheckTimer); this.healthCheckTimer = null; } // Cancel active transfers this.activeTransfers.forEach((progress, transferId) => { progress.status = 'cancelled'; this.emit('transfer-cancelled', progress); }); // Clear queues this.transferQueue.clear(); this.batchQueue.clear(); // Close SFTP connection if (this.sftpClient) { try { await this.sftpClient.end(); } catch (error) { this.logger.warn('Error closing SFTP client:', error); } this.sftpClient = null; } // Close SSH connection if (this.sshClient) { this.sshClient.end(); this.sshClient = null; } this.isConnected = false; this.connectionState.isConnected = false; this.emit('disconnected'); } /** * Upload a single file with progress tracking */ async uploadFile( localPath: string, remotePath: string, options: Partial<SFTPTransferOptions> = {} ): Promise<SFTPTransferProgress> { const transferId = uuidv4(); const progress: SFTPTransferProgress = { transferId, sessionId: this.sessionId, operation: 'upload', status: 'queued', source: localPath, destination: remotePath, totalBytes: 0, transferredBytes: 0, percentage: 0, speed: 0, eta: 0, startTime: new Date() }; this.activeTransfers.set(transferId, progress); this.emit('transfer-queued', progress); await this.transferQueue.add( async () => this.executeUpload(progress, options), { priority: this.getPriority(options.priority) } ); return progress; } /** * Download a single file with progress tracking */ async downloadFile( remotePath: string, localPath: string, options: Partial<SFTPTransferOptions> = {} ): Promise<SFTPTransferProgress> { const transferId = uuidv4(); const progress: SFTPTransferProgress = { transferId, sessionId: this.sessionId, operation: 'download', status: 'queued', source: remotePath, destination: localPath, totalBytes: 0, transferredBytes: 0, percentage: 0, speed: 0, eta: 0, startTime: new Date() }; this.activeTransfers.set(transferId, progress); this.emit('transfer-queued', progress); await this.transferQueue.add( async () => this.executeDownload(progress, options), { priority: this.getPriority(options.priority) } ); return progress; } /** * Execute batch transfer operations */ async batchTransfer(transfers: SFTPTransferOptions[]): Promise<SFTPBatchTransfer> { const batchId = uuidv4(); const batch: SFTPBatchTransfer = { batchId, sessionId: this.sessionId, transfers, concurrency: Math.min(transfers.length, this.options.maxConcurrentTransfers || 3), status: 'pending', progress: [], totalTransfers: transfers.length, completedTransfers: 0, failedTransfers: 0, totalBytes: 0, transferredBytes: 0, overallProgress: 0, startTime: new Date(), errors: [] }; this.activeBatches.set(batchId, batch); this.emit('batch-queued', batch); // Queue batch processing this.batchQueue.add(() => this.executeBatch(batch)); return batch; } /** * Synchronize directories with comprehensive options */ async synchronizeDirectories( localPath: string, remotePath: string, options: SFTPSyncOptions ): Promise<SFTPSyncResult> { const syncId = uuidv4(); this.logger.info(`Starting directory sync: ${localPath} <-> ${remotePath}`); const result: SFTPSyncResult = { syncId, sessionId: this.sessionId, options, summary: { filesTransferred: 0, filesSkipped: 0, filesDeleted: 0, directoriesCreated: 0, directoriesDeleted: 0, totalBytes: 0, transferTime: 0, conflicts: 0 }, details: { transferred: [], skipped: [], deleted: [], conflicts: [] }, errors: [], warnings: [] }; const startTime = Date.now(); try { if (options.dryRun) { this.logger.info('Performing dry run synchronization'); } // Analyze directory structures const localFiles = await this.analyzeDirectory(localPath, options); const remoteFiles = await this.analyzeRemoteDirectory(remotePath, options); // Generate sync plan const syncPlan = this.generateSyncPlan(localFiles, remoteFiles, options); // Execute sync plan await this.executeSyncPlan(syncPlan, result, options); result.summary.transferTime = Date.now() - startTime; this.emit('sync-completed', result); } catch (error) { result.errors.push(error instanceof Error ? error.message : String(error)); this.emit('sync-error', { syncId, error }); throw error; } return result; } /** * Resume interrupted transfer */ async resumeTransfer(transferId: string): Promise<SFTPTransferProgress> { const resumeInfo = this.resumeData.get(transferId); if (!resumeInfo) { throw new Error(`No resume data found for transfer: ${transferId}`); } const progress: SFTPTransferProgress = { ...resumeInfo, status: 'resumed', startTime: new Date() }; this.activeTransfers.set(transferId, progress); this.emit('transfer-resumed', progress); await this.transferQueue.add( async () => this.executeResume(progress, resumeInfo), { priority: this.getPriority('high') } ); return progress; } /** * Get current connection state and statistics */ getConnectionState(): SFTPConnectionState { this.connectionState.activeTransfers = this.activeTransfers.size; this.connectionState.queuedTransfers = this.transferQueue.size; this.connectionState.lastActivity = new Date(); return { ...this.connectionState }; } /** * Cancel active transfer */ async cancelTransfer(transferId: string): Promise<void> { const progress = this.activeTransfers.get(transferId); if (!progress) { throw new Error(`Transfer not found: ${transferId}`); } progress.status = 'cancelled'; this.activeTransfers.delete(transferId); this.emit('transfer-cancelled', progress); } /** * Pause active transfer */ async pauseTransfer(transferId: string): Promise<void> { const progress = this.activeTransfers.get(transferId); if (!progress) { throw new Error(`Transfer not found: ${transferId}`); } progress.status = 'paused'; // Save resume data this.resumeData.set(transferId, progress); this.emit('transfer-paused', progress); } // Private implementation methods private async establishConnection(): Promise<void> { this.sshClient = new SSHClient(); this.sftpClient = new SFTPClient(); return new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error('SSH connection timeout')); }, this.options.timeout || 30000); this.sshClient!.on('ready', async () => { clearTimeout(timeout); try { // Initialize SFTP subsystem await this.sftpClient!.connect({ sock: this.sshClient!, // Pass through SSH connection options ...this.options } as any); // Get server information await this.gatherServerInfo(); resolve(); } catch (error) { reject(error); } }); this.sshClient!.on('error', (error) => { clearTimeout(timeout); reject(error); }); // Convert SFTPSessionOptions to ssh2 ConnectConfig const connectConfig = { ...this.options, debug: this.options.debug ? (info: string) => console.log(`SSH Debug: ${info}`) : undefined }; this.sshClient!.connect(connectConfig); }); } private async gatherServerInfo(): Promise<void> { try { if (!this.sftpClient) return; // Set basic server info - ssh2-sftp-client doesn't expose version this.connectionState.serverInfo.version = 'unknown'; // Test server capabilities try { await this.sftpClient.list('/'); this.connectionState.serverInfo.extensions.push('directory-listing'); } catch (error) { // Directory listing not available } // Test other capabilities this.connectionState.serverInfo.extensions.push('file-transfer'); } catch (error) { this.logger.warn('Could not gather complete server information:', error); } } private async executeUpload( progress: SFTPTransferProgress, options: Partial<SFTPTransferOptions> ): Promise<SFTPTransferProgress> { if (!this.sftpClient) { throw new Error('SFTP client not connected'); } progress.status = 'preparing'; this.emit('transfer-started', progress); try { // Get file stats const stats = await fs.stat(progress.source); progress.totalBytes = stats.size; // Create remote directory if needed if (options.createDirectories) { const remoteDir = dirname(progress.destination); try { await this.sftpClient.mkdir(remoteDir, true); } catch (error) { // Directory might already exist } } // Setup progress tracking const progressTracker = this.createProgressTracker(progress); // Execute upload with streaming progress.status = 'transferring'; const transferOptions = this.convertTransferOptions(options); transferOptions.writeStreamOptions = { ...transferOptions.writeStreamOptions, ...progressTracker }; await this.sftpClient.put( progress.source, progress.destination, transferOptions ); // Verify transfer if requested if (options.checksumVerification) { progress.status = 'verifying'; const verified = await this.verifyTransfer(progress.source, progress.destination); progress.checksumMatch = verified; } progress.status = 'completed'; progress.endTime = new Date(); progress.percentage = 100; this.updateConnectionStats('upload', progress.totalBytes); this.emit('transfer-completed', progress); } catch (error) { progress.status = 'failed'; progress.error = error instanceof Error ? error.message : String(error); progress.endTime = new Date(); // Save resume data for retryable transfers if (options.resumeSupport && this.isRetryableError(error)) { this.resumeData.set(progress.transferId, progress); } this.emit('transfer-failed', progress); throw error; } finally { this.activeTransfers.delete(progress.transferId); } return progress; } private async executeDownload( progress: SFTPTransferProgress, options: Partial<SFTPTransferOptions> ): Promise<SFTPTransferProgress> { if (!this.sftpClient) { throw new Error('SFTP client not connected'); } progress.status = 'preparing'; this.emit('transfer-started', progress); try { // Get remote file stats const stats = await this.sftpClient.stat(progress.source); progress.totalBytes = stats.size; // Create local directory if needed if (options.createDirectories) { const localDir = dirname(progress.destination); await fs.mkdir(localDir, { recursive: true }); } // Setup progress tracking const progressTracker = this.createProgressTracker(progress); // Execute download with streaming progress.status = 'transferring'; const transferOptions = this.convertTransferOptions(options); transferOptions.readStreamOptions = { ...transferOptions.readStreamOptions, ...progressTracker }; await this.sftpClient.get( progress.source, progress.destination, transferOptions ); // Verify transfer if requested if (options.checksumVerification) { progress.status = 'verifying'; const verified = await this.verifyTransfer(progress.destination, progress.source); progress.checksumMatch = verified; } progress.status = 'completed'; progress.endTime = new Date(); progress.percentage = 100; this.updateConnectionStats('download', progress.totalBytes); this.emit('transfer-completed', progress); } catch (error) { progress.status = 'failed'; progress.error = error instanceof Error ? error.message : String(error); progress.endTime = new Date(); // Save resume data for retryable transfers if (options.resumeSupport && this.isRetryableError(error)) { this.resumeData.set(progress.transferId, progress); } this.emit('transfer-failed', progress); throw error; } finally { this.activeTransfers.delete(progress.transferId); } return progress; } private async executeBatch(batch: SFTPBatchTransfer): Promise<void> { batch.status = 'running'; batch.startTime = new Date(); this.emit('batch-started', batch); const concurrentQueue = new PQueue({ concurrency: batch.concurrency }); try { // Calculate total bytes for (const transfer of batch.transfers) { try { const stats = await this.getTransferSize(transfer); batch.totalBytes += stats; } catch (error) { // Continue with other transfers } } // Execute transfers const promises = batch.transfers.map(transfer => concurrentQueue.add(async () => { try { const progress = await this.executeTransfer(transfer); batch.progress.push(progress); batch.completedTransfers++; batch.transferredBytes += progress.transferredBytes; this.updateBatchProgress(batch); } catch (error) { batch.failedTransfers++; batch.errors.push(error instanceof Error ? error.message : String(error)); this.updateBatchProgress(batch); } }) ); await Promise.allSettled(promises); batch.status = batch.failedTransfers > 0 ? 'failed' : 'completed'; batch.endTime = new Date(); this.emit('batch-completed', batch); } catch (error) { batch.status = 'failed'; batch.endTime = new Date(); batch.errors.push(error instanceof Error ? error.message : String(error)); this.emit('batch-failed', batch); } finally { this.activeBatches.delete(batch.batchId); } } private async executeTransfer(transfer: SFTPTransferOptions): Promise<SFTPTransferProgress> { // Determine operation type based on paths const isUpload = this.isLocalPath(transfer.source); if (isUpload) { return this.uploadFile(transfer.source, transfer.destination, transfer); } else { return this.downloadFile(transfer.source, transfer.destination, transfer); } } private createProgressTracker(progress: SFTPTransferProgress): any { const startTime = Date.now(); let lastUpdate = startTime; let lastBytes = 0; return { // For writable streams (uploads) transform: new Transform({ transform(chunk: Buffer, encoding: BufferEncoding, callback: (error?: Error | null, data?: any) => void) { progress.transferredBytes += chunk.length; const now = Date.now(); if (now - lastUpdate > 1000) { // Update every second const elapsed = (now - startTime) / 1000; const bytesSinceLastUpdate = progress.transferredBytes - lastBytes; const timeSinceLastUpdate = (now - lastUpdate) / 1000; progress.percentage = (progress.transferredBytes / progress.totalBytes) * 100; progress.speed = bytesSinceLastUpdate / timeSinceLastUpdate; progress.eta = progress.speed > 0 ? (progress.totalBytes - progress.transferredBytes) / progress.speed : 0; lastUpdate = now; lastBytes = progress.transferredBytes; this.emit('transfer-progress', progress); } callback(null, chunk); } }), // For readable streams (downloads) highWaterMark: 64 * 1024, // 64KB chunks objectMode: false }; } private async verifyTransfer(localPath: string, remotePath: string): Promise<boolean> { try { const localChecksum = await this.calculateFileChecksum(localPath); const remoteChecksum = await this.calculateRemoteChecksum(remotePath); return localChecksum === remoteChecksum; } catch (error) { this.logger.warn('Transfer verification failed:', error); return false; } } private async calculateFileChecksum(filePath: string, algorithm: string = 'sha256'): Promise<string> { const hash = createHash(algorithm); const stream = createReadStream(filePath); for await (const chunk of stream) { hash.update(chunk); } return hash.digest('hex'); } private async calculateRemoteChecksum(remotePath: string, algorithm: string = 'sha256'): Promise<string> { // This would require executing a checksum command on the remote server // Implementation depends on server capabilities throw new Error('Remote checksum calculation not implemented'); } private convertTransferOptions(options: Partial<SFTPTransferOptions>): SFTPClient.TransferOptions { return { writeStreamOptions: { flags: options.overwrite ? 'w' : 'a', // ssh2-sftp-client only supports 'w' or 'a' mode: options.preservePermissions ? undefined : 0o644, }, readStreamOptions: { flags: 'r', } }; } private createDirectoryOperations(): SFTPDirectoryOperations { return { create: async (path: string, mode?: number, recursive?: boolean) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); await this.sftpClient.mkdir(path, recursive || false); }, remove: async (path: string, recursive?: boolean) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); if (recursive) { await this.sftpClient.rmdir(path, true); } else { await this.sftpClient.rmdir(path); } }, list: async (path: string, detailed?: boolean): Promise<SFTPDirectoryListing> => { if (!this.sftpClient) throw new Error('SFTP client not connected'); const items = await this.sftpClient.list(path, detailed ? undefined : ((item: SFTPClient.FileInfo) => item.type !== 'd')); const listing: SFTPDirectoryListing = { path, files: [], directories: [], symlinks: [], totalSize: 0, totalFiles: 0, totalDirectories: 0 }; for (const item of items) { const fileInfo: SFTPFileInfo = { path: join(path, item.name), name: item.name, type: this.mapFileType(item.type), size: item.size, mode: item.rights ? this.parseRights(item.rights) : 0, uid: item.owner || 0, gid: item.group || 0, atime: new Date(item.accessTime || 0), mtime: new Date(item.modifyTime || 0), isReadable: true, // Would need to check actual permissions isWritable: true, isExecutable: false }; if (fileInfo.type === 'file') { listing.files.push(fileInfo); listing.totalFiles++; listing.totalSize += fileInfo.size; } else if (fileInfo.type === 'directory') { listing.directories.push(fileInfo); listing.totalDirectories++; } else if (fileInfo.type === 'symlink') { listing.symlinks.push(fileInfo); } } return listing; }, exists: async (path: string): Promise<false | FileInfoType> => { if (!this.sftpClient) throw new Error('SFTP client not connected'); return await this.sftpClient.exists(path); }, chmod: async (path: string, mode: number) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); await this.sftpClient.chmod(path, mode); }, chown: async (path: string, uid: number, gid: number) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); // Note: chown might not be supported by all SFTP servers throw new Error('chown not implemented - not universally supported by SFTP servers'); }, stat: async (path: string): Promise<SFTPFileInfo> => { if (!this.sftpClient) throw new Error('SFTP client not connected'); const stats = await this.sftpClient.stat(path); return { path, name: basename(path), type: stats.isDirectory ? 'directory' : stats.isFile ? 'file' : stats.isSymbolicLink ? 'symlink' : 'special', size: stats.size, mode: stats.mode || 0, uid: stats.uid || 0, gid: stats.gid || 0, atime: new Date(stats.accessTime || 0), mtime: new Date(stats.modifyTime || 0), isReadable: true, isWritable: true, isExecutable: false }; }, realpath: async (path: string) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); return await this.sftpClient.realPath(path); }, readlink: async (path: string) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); // Not directly supported by ssh2-sftp-client throw new Error('readlink not implemented'); }, symlink: async (target: string, path: string) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); // Not directly supported by ssh2-sftp-client throw new Error('symlink not implemented'); } }; } private createFileOperations(): SFTPFileOperations { return { upload: async (localPath: string, remotePath: string, options?: Partial<SFTPTransferOptions>) => { return this.uploadFile(localPath, remotePath, options || {}); }, download: async (remotePath: string, localPath: string, options?: Partial<SFTPTransferOptions>) => { return this.downloadFile(remotePath, localPath, options || {}); }, copy: async (sourcePath: string, destinationPath: string, options?: Partial<SFTPTransferOptions>) => { // SFTP doesn't support server-side copy, so we download then upload const tempPath = join(this.options.tempDirectory || '/tmp', `sftp-copy-${uuidv4()}`); try { await this.downloadFile(sourcePath, tempPath, options); return await this.uploadFile(tempPath, destinationPath, options); } finally { try { await fs.unlink(tempPath); } catch { // Ignore cleanup errors } } }, move: async (sourcePath: string, destinationPath: string) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); await this.sftpClient.rename(sourcePath, destinationPath); }, remove: async (path: string) => { if (!this.sftpClient) throw new Error('SFTP client not connected'); await this.sftpClient.delete(path); }, exists: async (path: string): Promise<false | FileInfoType> => { if (!this.sftpClient) throw new Error('SFTP client not connected'); return await this.sftpClient.exists(path); }, checksum: async (path: string, algorithm?: 'md5' | 'sha1' | 'sha256') => { // Would require executing checksum command on remote server throw new Error('Remote file checksum not implemented'); }, compare: async (localPath: string, remotePath: string) => { // Compare file sizes and timestamps const localStats = await fs.stat(localPath); const remoteStats = await this.sftpClient!.stat(remotePath); const differences: string[] = []; if (localStats.size !== remoteStats.size) { differences.push(`Size: local=${localStats.size}, remote=${remoteStats.size}`); } if (Math.abs(localStats.mtime.getTime() - remoteStats.modifyTime) > 1000) { differences.push(`Modified time: local=${localStats.mtime}, remote=${new Date(remoteStats.modifyTime)}`); } return { identical: differences.length === 0, differences }; } }; } private setupErrorHandling(): void { this.retryManager.on('retry-attempted', (data) => { this.logger.info(`SFTP retry attempted: ${data.strategy}`); this.emit('retry-attempted', data); }); this.errorRecovery.on('recovery-attempted', (data) => { this.logger.info(`SFTP error recovery attempted: ${data.strategy}`); this.emit('recovery-attempted', data); }); } private setupEventHandlers(): void { this.transferQueue.on('active', () => { this.connectionState.activeTransfers = this.activeTransfers.size; this.emit('queue-active', this.getConnectionState()); }); this.transferQueue.on('idle', () => { this.connectionState.activeTransfers = this.activeTransfers.size; this.emit('queue-idle', this.getConnectionState()); }); } private startKeepAlive(): void { if (!this.options.keepAlive?.enabled) return; this.keepAliveTimer = setInterval(async () => { try { if (this.sftpClient && this.isConnected) { // Send a simple operation to keep connection alive await this.sftpClient.list('/'); this.connectionState.lastActivity = new Date(); } } catch (error) { this.logger.warn('Keep-alive check failed:', error); this.emit('connection-warning', { type: 'keepalive-failed', error }); } }, this.options.keepAlive.interval); } private startHealthCheck(): void { this.healthCheckTimer = setInterval(async () => { try { await this.performHealthCheck(); } catch (error) { this.logger.error('Health check failed:', error); this.emit('health-check-failed', error); } }, 60000); // Check every minute } private async performHealthCheck(): Promise<void> { if (!this.isConnected || !this.sftpClient) { throw new Error('SFTP not connected'); } // Test basic connectivity await this.sftpClient.list('/'); // Update connection state this.connectionState.lastActivity = new Date(); this.emit('health-check-passed', this.getConnectionState()); } private getPriority(priority?: 'low' | 'normal' | 'high' | 'critical'): number { switch (priority) { case 'critical': return 4; case 'high': return 3; case 'normal': return 2; case 'low': return 1; default: return 2; } } private mapFileType(type: string): 'file' | 'directory' | 'symlink' | 'special' { switch (type) { case 'd': return 'directory'; case 'l': return 'symlink'; case '-': return 'file'; default: return 'special'; } } private parseRights(rights: { user: string; group: string; other: string }): number { const parsePermission = (perm: string): number => { let result = 0; if (perm.includes('r')) result += 4; if (perm.includes('w')) result += 2; if (perm.includes('x')) result += 1; return result; }; const user = parsePermission(rights.user); const group = parsePermission(rights.group); const other = parsePermission(rights.other); return parseInt(`${user}${group}${other}`, 8); } private updateConnectionStats(operation: 'upload' | 'download', bytes: number): void { if (operation === 'upload') { this.connectionState.transferStats.totalUploads++; } else { this.connectionState.transferStats.totalDownloads++; } this.connectionState.transferStats.totalBytes += bytes; // Calculate average speed (simplified) const totalTransfers = this.connectionState.transferStats.totalUploads + this.connectionState.transferStats.totalDownloads; const totalTime = (Date.now() - this.connectionState.connectionTime.getTime()) / 1000; if (totalTime > 0) { this.connectionState.transferStats.averageSpeed = this.connectionState.transferStats.totalBytes / totalTime; } } private updateBatchProgress(batch: SFTPBatchTransfer): void { batch.overallProgress = (batch.completedTransfers / batch.totalTransfers) * 100; this.emit('batch-progress', batch); } private isLocalPath(path: string): boolean { // Simple heuristic to determine if path is local return !path.includes('@') && (path.startsWith('/') || path.match(/^[A-Za-z]:/) !== null); } private isRetryableError(error: any): boolean { const retryableErrors = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED']; return retryableErrors.includes(error.code) || error.message.includes('timeout'); } private async getTransferSize(transfer: SFTPTransferOptions): Promise<number> { try { if (this.isLocalPath(transfer.source)) { const stats = await fs.stat(transfer.source); return stats.size; } else { const stats = await this.sftpClient!.stat(transfer.source); return stats.size; } } catch (error) { return 0; } } // Placeholder methods for sync functionality private async analyzeDirectory(path: string, options: SFTPSyncOptions): Promise<SFTPFileInfo[]> { // Implementation would recursively analyze local directory return []; } private async analyzeRemoteDirectory(path: string, options: SFTPSyncOptions): Promise<SFTPFileInfo[]> { // Implementation would recursively analyze remote directory return []; } private generateSyncPlan( localFiles: SFTPFileInfo[], remoteFiles: SFTPFileInfo[], options: SFTPSyncOptions ): any { // Implementation would compare files and generate sync plan return {}; } private async executeSyncPlan(plan: any, result: SFTPSyncResult, options: SFTPSyncOptions): Promise<void> { // Implementation would execute the sync plan } private async executeResume(progress: SFTPTransferProgress, resumeInfo: any): Promise<SFTPTransferProgress> { // Implementation would resume interrupted transfer throw new Error('Resume not implemented yet'); } } /** * Bandwidth controller for transfer throttling */ class BandwidthController { private uploadLimit: number; private downloadLimit: number; private adaptiveThrottling: boolean; private currentUploadSpeed: number = 0; private currentDownloadSpeed: number = 0; constructor(options: any) { this.uploadLimit = options.uploadLimit || 0; this.downloadLimit = options.downloadLimit || 0; this.adaptiveThrottling = options.adaptiveThrottling || false; } async throttleUpload(bytes: number): Promise<void> { if (this.uploadLimit > 0 && this.currentUploadSpeed > this.uploadLimit) { const delay = (bytes / this.uploadLimit) * 1000; await new Promise(resolve => setTimeout(resolve, delay)); } } async throttleDownload(bytes: number): Promise<void> { if (this.downloadLimit > 0 && this.currentDownloadSpeed > this.downloadLimit) { const delay = (bytes / this.downloadLimit) * 1000; await new Promise(resolve => setTimeout(resolve, delay)); } } updateSpeed(operation: 'upload' | 'download', bytesPerSecond: number): void { if (operation === 'upload') { this.currentUploadSpeed = bytesPerSecond; } else { this.currentDownloadSpeed = bytesPerSecond; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server