Skip to main content
Glama
ooples

MCP Console Automation Server

NamedPipeProtocol.ts21.7 kB
import { spawn, ChildProcess } from 'child_process'; import { BaseProtocol } from '../core/BaseProtocol.js'; import { ConsoleSession, SessionOptions, ConsoleType, ConsoleOutput, } from '../types/index.js'; import { ProtocolCapabilities, SessionState, ErrorContext, ProtocolHealthStatus, ErrorRecoveryResult, ResourceUsage, } from '../core/IProtocol.js'; import * as fs from 'fs'; import * as path from 'path'; // Named Pipe Protocol connection options interface NamedPipeConnectionOptions extends SessionOptions { pipeName: string; pipeType?: 'byte' | 'message'; pipeDirection?: 'in' | 'out' | 'duplex'; pipeAccess?: 'read' | 'write' | 'readwrite'; pipeMode?: | 'byte' | 'message' | 'nowait' | 'wait' | 'readmode_byte' | 'readmode_message'; maxInstances?: number; outputBufferSize?: number; inputBufferSize?: number; defaultTimeout?: number; securityDescriptor?: string; enableInheritHandle?: boolean; enableOverlapped?: boolean; enableFirstPipeInstance?: boolean; enableWriteThrough?: boolean; enableServerEnd?: boolean; enableClientEnd?: boolean; waitNamedPipe?: boolean; waitTimeout?: number; createIfNotExist?: boolean; deleteOnClose?: boolean; impersonateClient?: boolean; pipeNamespace?: string; enableAcl?: boolean; aclPermissions?: string[]; fifoPath?: string; fifoMode?: number; fifoOwner?: string; fifoGroup?: string; enableBlocking?: boolean; enableNonBlocking?: boolean; enableAsync?: boolean; bufferSize?: number; readTimeout?: number; writeTimeout?: number; autoFlush?: boolean; enableLogging?: boolean; logLevel?: 'debug' | 'info' | 'warning' | 'error'; platform?: 'windows' | 'linux' | 'unix' | 'auto'; operation?: | 'server' | 'client' | 'connect' | 'listen' | 'send' | 'receive' | 'monitor'; protocol?: 'simple' | 'json' | 'binary' | 'custom'; messageDelimiter?: string; encoding?: 'utf8' | 'ascii' | 'base64' | 'hex' | 'binary'; enableCompression?: boolean; compressionLevel?: number; enableChecksum?: boolean; checksumAlgorithm?: 'crc32' | 'md5' | 'sha1' | 'sha256'; enableHeartbeat?: boolean; heartbeatInterval?: number; reconnectAttempts?: number; reconnectDelay?: number; environment?: Record<string, string>; } /** * Named Pipe Protocol Implementation * * Provides Named Pipe (Windows) and FIFO (Unix) inter-process communication console access * Supports pipe creation, connection, data transfer, messaging protocols, and cross-platform IPC */ export class NamedPipeProtocol extends BaseProtocol { public readonly type: ConsoleType = 'named-pipe'; public readonly capabilities: ProtocolCapabilities; private namedPipeProcesses = new Map<string, ChildProcess>(); private pipeHandles = new Map<string, any>(); // Compatibility property for old ProtocolFactory interface public get healthStatus(): ProtocolHealthStatus { return { isHealthy: this.isInitialized, lastChecked: new Date(), errors: [], warnings: [], metrics: { activeSessions: this.sessions.size, totalSessions: this.sessions.size, averageLatency: 0, successRate: 100, uptime: 0, }, dependencies: {}, }; } constructor() { super('named-pipe'); this.capabilities = { supportsStreaming: true, supportsFileTransfer: true, supportsX11Forwarding: false, supportsPortForwarding: false, supportsAuthentication: true, supportsEncryption: false, supportsCompression: true, supportsMultiplexing: true, supportsKeepAlive: true, supportsReconnection: true, supportsBinaryData: true, supportsCustomEnvironment: true, supportsWorkingDirectory: true, supportsSignals: false, supportsResizing: false, supportsPTY: false, maxConcurrentSessions: 50, // Named pipes can handle multiple instances defaultTimeout: 30000, // IPC operations are typically fast supportedEncodings: ['utf-8', 'ascii', 'binary'], supportedAuthMethods: ['pipe', 'acl'], platformSupport: { windows: true, // Primary named pipe support linux: true, // FIFO support macos: true, // FIFO support freebsd: true, // FIFO support }, }; } async initialize(): Promise<void> { if (this.isInitialized) return; try { // Check if Named Pipe/FIFO tools are available await this.checkNamedPipeAvailability(); this.isInitialized = true; this.logger.info( 'Named Pipe protocol initialized with production features' ); } catch (error: any) { this.logger.error('Failed to initialize Named Pipe protocol', error); throw error; } } async createSession(options: SessionOptions): Promise<ConsoleSession> { const sessionId = `namedpipe-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`; return await this.createSessionWithTypeDetection(sessionId, options); } async dispose(): Promise<void> { await this.cleanup(); } async executeCommand( sessionId: string, command: string, args?: string[] ): Promise<void> { const fullCommand = args && args.length > 0 ? `${command} ${args.join(' ')}` : command; await this.sendInput(sessionId, fullCommand + '\n'); } async sendInput(sessionId: string, input: string): Promise<void> { const namedPipeProcess = this.namedPipeProcesses.get(sessionId); const session = this.sessions.get(sessionId); if (!namedPipeProcess || !namedPipeProcess.stdin || !session) { throw new Error(`No active Named Pipe session: ${sessionId}`); } namedPipeProcess.stdin.write(input); session.lastActivity = new Date(); this.emit('input-sent', { sessionId, input, timestamp: new Date(), }); this.logger.debug( `Sent input to Named Pipe session ${sessionId}: ${input.substring(0, 100)}` ); } async closeSession(sessionId: string): Promise<void> { try { const namedPipeProcess = this.namedPipeProcesses.get(sessionId); if (namedPipeProcess) { // Try graceful shutdown first namedPipeProcess.kill('SIGTERM'); // Force kill after timeout setTimeout(() => { if (namedPipeProcess && !namedPipeProcess.killed) { namedPipeProcess.kill('SIGKILL'); } }, 5000); this.namedPipeProcesses.delete(sessionId); } // Clean up pipe handle if exists const pipeHandle = this.pipeHandles.get(sessionId); if (pipeHandle) { this.pipeHandles.delete(sessionId); } // Clean up base protocol session this.sessions.delete(sessionId); this.logger.info(`Named Pipe session ${sessionId} closed`); this.emit('session-closed', sessionId); } catch (error) { this.logger.error( `Error closing Named Pipe session ${sessionId}:`, error ); throw error; } } async doCreateSession( sessionId: string, options: SessionOptions, sessionState: SessionState ): Promise<ConsoleSession> { if (!this.isInitialized) { await this.initialize(); } const namedPipeOptions = options as NamedPipeConnectionOptions; // Validate required pipe parameters if (!namedPipeOptions.pipeName) { throw new Error('Pipe name is required for Named Pipe protocol'); } // Build Named Pipe command const namedPipeCommand = this.buildNamedPipeCommand(namedPipeOptions); // Spawn Named Pipe process const namedPipeProcess = spawn( namedPipeCommand[0], namedPipeCommand.slice(1), { stdio: ['pipe', 'pipe', 'pipe'], cwd: options.cwd || process.cwd(), env: { ...process.env, ...this.buildEnvironment(namedPipeOptions), ...options.env, }, } ); // Set up output handling namedPipeProcess.stdout?.on('data', (data) => { const output: ConsoleOutput = { sessionId, type: 'stdout', data: data.toString(), timestamp: new Date(), }; this.addToOutputBuffer(sessionId, output); }); namedPipeProcess.stderr?.on('data', (data) => { const output: ConsoleOutput = { sessionId, type: 'stderr', data: data.toString(), timestamp: new Date(), }; this.addToOutputBuffer(sessionId, output); }); namedPipeProcess.on('error', (error) => { this.logger.error( `Named Pipe process error for session ${sessionId}:`, error ); this.emit('session-error', { sessionId, error }); }); namedPipeProcess.on('close', (code) => { this.logger.info( `Named Pipe process closed for session ${sessionId} with code ${code}` ); this.markSessionComplete(sessionId, code || 0); }); // Store the process this.namedPipeProcesses.set(sessionId, namedPipeProcess); // Create session object const session: ConsoleSession = { id: sessionId, command: namedPipeCommand[0], args: namedPipeCommand.slice(1), cwd: options.cwd || process.cwd(), env: { ...process.env, ...this.buildEnvironment(namedPipeOptions), ...options.env, }, createdAt: new Date(), lastActivity: new Date(), status: 'running', type: this.type, streaming: options.streaming, executionState: 'idle', activeCommands: new Map(), pid: namedPipeProcess.pid, }; this.sessions.set(sessionId, session); this.logger.info( `Named Pipe session ${sessionId} created for pipe ${namedPipeOptions.pipeName}` ); this.emit('session-created', { sessionId, type: 'named-pipe', session }); return session; } // Override getOutput to satisfy old ProtocolFactory interface (returns string) async getOutput(sessionId: string, since?: Date): Promise<any> { const outputs = await super.getOutput(sessionId, since); return outputs.map((output) => output.data).join(''); } // Missing IProtocol methods for compatibility getAllSessions(): ConsoleSession[] { return Array.from(this.sessions.values()); } getActiveSessions(): ConsoleSession[] { return Array.from(this.sessions.values()).filter( (session) => session.status === 'running' ); } getSessionCount(): number { return this.sessions.size; } async getSessionState(sessionId: string): Promise<SessionState> { const session = this.sessions.get(sessionId); if (!session) { throw new Error(`Session ${sessionId} not found`); } return { sessionId, status: session.status, isOneShot: false, // Named pipe sessions can be persistent isPersistent: true, createdAt: session.createdAt, lastActivity: session.lastActivity, pid: session.pid, metadata: {}, }; } async handleError( error: Error, context: ErrorContext ): Promise<ErrorRecoveryResult> { this.logger.error( `Error in Named Pipe session ${context.sessionId}: ${error.message}` ); return { recovered: false, strategy: 'none', attempts: 0, duration: 0, error: error.message, }; } async recoverSession(sessionId: string): Promise<boolean> { const namedPipeProcess = this.namedPipeProcesses.get(sessionId); return (namedPipeProcess && !namedPipeProcess.killed) || false; } getResourceUsage(): ResourceUsage { const memUsage = process.memoryUsage(); const cpuUsage = process.cpuUsage(); return { memory: { used: memUsage.heapUsed, available: memUsage.heapTotal, peak: memUsage.heapTotal, }, cpu: { usage: cpuUsage.user + cpuUsage.system, load: [0, 0, 0], }, network: { bytesIn: 0, bytesOut: 0, connectionsActive: this.namedPipeProcesses.size, }, storage: { bytesRead: 0, bytesWritten: 0, }, sessions: { active: this.sessions.size, total: this.sessions.size, peak: this.sessions.size, }, }; } async getHealthStatus(): Promise<ProtocolHealthStatus> { const baseStatus = await super.getHealthStatus(); try { await this.checkNamedPipeAvailability(); return { ...baseStatus, dependencies: { namedpipe: { available: true }, }, }; } catch (error) { return { ...baseStatus, isHealthy: false, errors: [...baseStatus.errors, `Named Pipe not available: ${error}`], dependencies: { namedpipe: { available: false }, }, }; } } private async checkNamedPipeAvailability(): Promise<void> { // Check platform-specific pipe availability if (process.platform === 'win32') { // Windows - check for named pipe support via PowerShell return new Promise((resolve, reject) => { const testProcess = spawn( 'powershell', ['-Command', 'Get-ChildItem \\\\.\\pipe\\'], { stdio: 'pipe' } ); testProcess.on('close', (code) => { if (code === 0) { resolve(); } else { reject( new Error( 'Windows Named Pipes not available. Please check system permissions.' ) ); } }); testProcess.on('error', () => { reject( new Error('PowerShell not available for Named Pipe operations.') ); }); }); } else { // Unix-like systems - check for FIFO support return new Promise((resolve, reject) => { const testProcess = spawn('mkfifo', ['--help'], { stdio: 'pipe' }); testProcess.on('close', (code) => { if (code === 0 || code === 1) { // mkfifo --help returns 1 on some systems resolve(); } else { reject( new Error( 'FIFO/Named Pipe tools not found. Please install coreutils.' ) ); } }); testProcess.on('error', () => { reject( new Error( 'FIFO/Named Pipe tools not found. Please install coreutils.' ) ); }); }); } } private buildNamedPipeCommand(options: NamedPipeConnectionOptions): string[] { const command = []; const isWindows = process.platform === 'win32'; const platform = options.platform === 'auto' ? isWindows ? 'windows' : 'unix' : options.platform || (isWindows ? 'windows' : 'unix'); if (platform === 'windows') { // Windows Named Pipe operations via PowerShell command.push('powershell'); command.push('-Command'); const pipePath = options.pipeName.startsWith('\\\\.\\pipe\\') ? options.pipeName : `\\\\.\\pipe\\${options.pipeName}`; if (options.operation === 'server' || options.operation === 'listen') { // Create and listen on named pipe let pipeScript = ` $pipe = New-Object System.IO.Pipes.NamedPipeServerStream('${options.pipeName.replace('\\\\.\\pipe\\', '')}`; if (options.pipeDirection) { const direction = options.pipeDirection === 'duplex' ? 'InOut' : options.pipeDirection === 'in' ? 'In' : 'Out'; pipeScript += `, [System.IO.Pipes.PipeDirection]::${direction}`; } if (options.maxInstances) { pipeScript += `, ${options.maxInstances}`; } pipeScript += `); $pipe.WaitForConnection(); `; if (options.operation === 'listen') { pipeScript += `while($pipe.IsConnected) { Start-Sleep -Milliseconds 100 }`; } command.push(pipeScript); } else if ( options.operation === 'client' || options.operation === 'connect' ) { // Connect to named pipe as client let pipeScript = ` $pipe = New-Object System.IO.Pipes.NamedPipeClientStream('.', '${options.pipeName.replace('\\\\.\\pipe\\', '')}`; if (options.pipeDirection) { const direction = options.pipeDirection === 'duplex' ? 'InOut' : options.pipeDirection === 'in' ? 'In' : 'Out'; pipeScript += `, [System.IO.Pipes.PipeDirection]::${direction}`; } pipeScript += `); $pipe.Connect(`; if (options.waitTimeout) { pipeScript += `${options.waitTimeout}`; } pipeScript += `);`; command.push(pipeScript); } else { // General pipe operations command.push(`Get-ChildItem '${pipePath}'`); } } else { // Unix FIFO operations const fifoPath = options.fifoPath || `/tmp/${options.pipeName}`; if (options.operation === 'server' || options.operation === 'listen') { // Create FIFO and listen command.push('sh'); command.push('-c'); let fifoScript = ''; if (options.createIfNotExist) { fifoScript += `mkfifo '${fifoPath}' 2>/dev/null || true; `; } if (options.fifoMode) { fifoScript += `chmod ${options.fifoMode.toString(8)} '${fifoPath}'; `; } if (options.fifoOwner) { fifoScript += `chown ${options.fifoOwner} '${fifoPath}'; `; } if (options.operation === 'listen') { if (options.enableNonBlocking) { fifoScript += `tail -f '${fifoPath}'`; } else { fifoScript += `cat '${fifoPath}'`; } } else { fifoScript += `echo 'FIFO created at ${fifoPath}'`; } command.push(fifoScript); } else if ( options.operation === 'client' || options.operation === 'connect' ) { // Connect to FIFO command.push('sh'); command.push('-c'); let connectScript = ''; if (options.waitNamedPipe) { connectScript += `while [ ! -p '${fifoPath}' ]; do sleep 0.1; done; `; } if (options.pipeAccess === 'write') { connectScript += `exec 3>'${fifoPath}'; echo 'Connected to FIFO for writing' >&3`; } else { connectScript += `exec 3<'${fifoPath}'; cat <&3`; } command.push(connectScript); } else { // General FIFO operations command.push('ls'); command.push('-la'); command.push(fifoPath); } } // Add custom command if provided if (options.command) { if (platform === 'windows') { command.push(';'); command.push(options.command); } else { command[command.length - 1] += `; ${options.command}`; } } // Add arguments if (options.args) { command.push(...options.args); } return command; } private buildEnvironment( options: NamedPipeConnectionOptions ): Record<string, string> { const env: Record<string, string> = {}; // Named Pipe environment variables if (options.pipeName) { env.PIPE_NAME = options.pipeName; } if (options.pipeNamespace) { env.PIPE_NAMESPACE = options.pipeNamespace; } // Buffer and timeout settings if (options.bufferSize) { env.PIPE_BUFFER_SIZE = options.bufferSize.toString(); } if (options.defaultTimeout) { env.PIPE_TIMEOUT = options.defaultTimeout.toString(); } // Protocol settings if (options.protocol) { env.PIPE_PROTOCOL = options.protocol; } if (options.messageDelimiter) { env.PIPE_MESSAGE_DELIMITER = options.messageDelimiter; } if (options.encoding) { env.PIPE_ENCODING = options.encoding; } // Logging if (options.enableLogging && options.logLevel) { env.PIPE_LOG_LEVEL = options.logLevel; env.PIPE_VERBOSE = '1'; } // Compression if (options.enableCompression) { env.PIPE_COMPRESSION = '1'; if (options.compressionLevel) { env.PIPE_COMPRESSION_LEVEL = options.compressionLevel.toString(); } } // Checksum if (options.enableChecksum && options.checksumAlgorithm) { env.PIPE_CHECKSUM = '1'; env.PIPE_CHECKSUM_ALGORITHM = options.checksumAlgorithm; } // Heartbeat if (options.enableHeartbeat && options.heartbeatInterval) { env.PIPE_HEARTBEAT = '1'; env.PIPE_HEARTBEAT_INTERVAL = options.heartbeatInterval.toString(); } // Reconnection if (options.reconnectAttempts) { env.PIPE_RECONNECT_ATTEMPTS = options.reconnectAttempts.toString(); } if (options.reconnectDelay) { env.PIPE_RECONNECT_DELAY = options.reconnectDelay.toString(); } // Custom environment variables if (options.environment) { Object.assign(env, options.environment); } return env; } async cleanup(): Promise<void> { this.logger.info('Cleaning up Named Pipe protocol'); // Close all Named Pipe processes for (const [sessionId, process] of Array.from(this.namedPipeProcesses)) { try { process.kill(); } catch (error) { this.logger.error( `Error killing Named Pipe process for session ${sessionId}:`, error ); } } // Clear all data this.namedPipeProcesses.clear(); this.pipeHandles.clear(); // Call parent cleanup await super.cleanup(); } } export default NamedPipeProtocol;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server