Skip to main content
Glama
ooples

MCP Console Automation Server

UnixSocketProtocol.ts18.7 kB
import * as net from 'net'; import * as fs from 'fs'; import * as path from 'path'; import { BaseProtocol } from '../core/BaseProtocol.js'; import { ConsoleSession, SessionOptions, ConsoleType, ConsoleOutput, } from '../types/index.js'; import { ProtocolCapabilities, SessionState, ErrorContext, ProtocolHealthStatus, ErrorRecoveryResult, ResourceUsage, } from '../core/IProtocol.js'; // Unix Socket Protocol connection options interface UnixSocketConnectionOptions extends SessionOptions { // Basic Socket Configuration socketPath: string; socketType?: 'stream' | 'dgram'; socketMode?: 'client' | 'server'; createSocket?: boolean; // Server Configuration serverPath?: string; serverBacklog?: number; serverExclusive?: boolean; enableReuseAddr?: boolean; // Client Configuration clientPath?: string; autoReconnect?: boolean; reconnectInterval?: number; maxReconnectAttempts?: number; // Socket Options allowHalfOpen?: boolean; pauseOnConnect?: boolean; readable?: boolean; writable?: boolean; // Permissions and Security socketUser?: string; socketGroup?: string; socketPermissions?: number; socketPerms?: string; enableCredentials?: boolean; // Buffer and Performance bufferSize?: number; highWaterMark?: number; objectMode?: boolean; encoding?: BufferEncoding; // Message Framing messageFraming?: 'none' | 'length' | 'delimiter' | 'json' | 'line'; messageDelimiter?: string; maxMessageSize?: number; enableHeartbeat?: boolean; heartbeatInterval?: number; // Timeout Configuration connectTimeout?: number; idleTimeout?: number; keepAliveTimeout?: number; // Protocol Features enableAck?: boolean; enableSequencing?: boolean; enableRetransmission?: boolean; enableCompression?: boolean; compressionAlgorithm?: 'gzip' | 'deflate' | 'brotli'; // Error Handling enableErrorRecovery?: boolean; errorRetryAttempts?: number; errorRetryDelay?: number; // Monitoring and Logging enableMetrics?: boolean; enableDebugLogging?: boolean; logLevel?: 'error' | 'warn' | 'info' | 'debug' | 'trace'; // Advanced Socket Features enableSocketReuse?: boolean; enableSocketKeepAlive?: boolean; socketKeepAliveDelay?: number; enableNodelay?: boolean; // Event Handling enableEventLogging?: boolean; eventLogPath?: string; // Custom Protocol customProtocol?: string; protocolVersion?: string; enableHandshake?: boolean; handshakeTimeout?: number; // File System Integration enableFileWatch?: boolean; fileWatchPath?: string; enableDirectorySync?: boolean; // Process Integration enableProcessBinding?: boolean; bindToProcess?: number; processIpcPath?: string; // Container Integration enableContainerBinding?: boolean; containerName?: string; containerNamespace?: string; // Load Balancing enableLoadBalancing?: boolean; loadBalanceStrategy?: 'round-robin' | 'least-connections' | 'hash'; poolSize?: number; // Security and Access Control enableAccessControl?: boolean; allowedUsers?: string[]; allowedGroups?: string[]; enableAuditLogging?: boolean; auditLogPath?: string; // Backup and Recovery enableBackup?: boolean; backupPath?: string; enableAutoRecovery?: boolean; recoveryTimeout?: number; // Performance Tuning enableBatching?: boolean; batchSize?: number; batchTimeout?: number; enablePipelining?: boolean; pipelineDepth?: number; // Protocol Extensions enableExtensions?: boolean; extensionPath?: string; customHeaders?: Record<string, string>; // Multi-socket Support enableMultiSocket?: boolean; socketCount?: number; socketPrefix?: string; // Unix-specific Features enableAbstractNamespace?: boolean; enableLinuxSocketOptions?: boolean; enableBsdSocketOptions?: boolean; enableSocketFiltering?: boolean; // Advanced IPC Features enableSharedMemory?: boolean; sharedMemorySize?: number; enableSignalHandling?: boolean; signalMask?: string[]; // Database Integration enableDbLogging?: boolean; dbConnectionString?: string; // Message Queue Integration enableMqIntegration?: boolean; mqConnectionString?: string; // Web Integration enableWebSocketBridge?: boolean; webSocketPort?: number; enableHttpBridge?: boolean; httpPort?: number; // Testing and Development enableTestMode?: boolean; mockResponses?: Record<string, string>; enableProfiling?: boolean; profilingPath?: string; } /** * Unix Socket Protocol Implementation * * Provides Unix domain socket communication for IPC * Supports stream and datagram sockets, client/server modes, message framing, and advanced socket features */ export class UnixSocketProtocol extends BaseProtocol { public readonly type: ConsoleType = 'unix-socket'; public readonly capabilities: ProtocolCapabilities; private sockets = new Map<string, net.Socket | net.Server>(); private socketStates = new Map< string, 'connecting' | 'connected' | 'disconnected' | 'error' >(); // Compatibility property for old ProtocolFactory interface public get healthStatus(): ProtocolHealthStatus { return { isHealthy: this.isInitialized, lastChecked: new Date(), errors: [], warnings: [], metrics: { activeSessions: this.sessions.size, totalSessions: this.sessions.size, averageLatency: 0, successRate: 100, uptime: 0, }, dependencies: {}, }; } constructor() { super('unix-socket'); this.capabilities = { supportsStreaming: true, supportsFileTransfer: true, supportsX11Forwarding: false, supportsPortForwarding: false, supportsAuthentication: true, supportsEncryption: false, // Unix sockets are local, security through filesystem permissions supportsCompression: true, supportsMultiplexing: true, supportsKeepAlive: true, supportsReconnection: true, supportsBinaryData: true, supportsCustomEnvironment: true, supportsWorkingDirectory: true, supportsSignals: true, supportsResizing: false, supportsPTY: false, maxConcurrentSessions: 1000, // Unix sockets can handle many connections defaultTimeout: 30000, // Socket operations supportedEncodings: ['utf-8', 'ascii', 'binary', 'base64', 'hex'], supportedAuthMethods: ['file-permissions', 'process-credentials'], platformSupport: { windows: false, // Unix domain sockets are primarily Unix/Linux linux: true, macos: true, freebsd: true, }, }; } async initialize(): Promise<void> { if (this.isInitialized) return; try { // Check if Unix socket functionality is available await this.checkUnixSocketAvailability(); this.isInitialized = true; this.logger.info('Unix Socket protocol initialized with IPC features'); } catch (error: any) { this.logger.error('Failed to initialize Unix Socket protocol', error); throw error; } } async createSession(options: SessionOptions): Promise<ConsoleSession> { const sessionId = `unix-socket-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`; return await this.createSessionWithTypeDetection(sessionId, options); } async dispose(): Promise<void> { await this.cleanup(); } async executeCommand( sessionId: string, command: string, args?: string[] ): Promise<void> { const data = JSON.stringify({ type: 'command', command, args: args || [], timestamp: new Date().toISOString(), }); await this.sendInput(sessionId, data + '\n'); } async sendInput(sessionId: string, input: string): Promise<void> { const socket = this.sockets.get(sessionId); const session = this.sessions.get(sessionId); if (!socket || !session) { throw new Error(`No active Unix socket session: ${sessionId}`); } if (socket instanceof net.Socket) { socket.write(input); } session.lastActivity = new Date(); this.emit('input-sent', { sessionId, input, timestamp: new Date(), }); this.logger.debug( `Sent input to Unix socket session ${sessionId}: ${input.substring(0, 100)}` ); } async closeSession(sessionId: string): Promise<void> { try { const socket = this.sockets.get(sessionId); if (socket) { if (socket instanceof net.Socket) { socket.end(); socket.destroy(); } else if (socket instanceof net.Server) { socket.close(); } this.sockets.delete(sessionId); this.socketStates.delete(sessionId); } // Clean up base protocol session this.sessions.delete(sessionId); this.logger.info(`Unix socket session ${sessionId} closed`); this.emit('session-closed', sessionId); } catch (error) { this.logger.error( `Error closing Unix socket session ${sessionId}:`, error ); throw error; } } async doCreateSession( sessionId: string, options: SessionOptions, sessionState: SessionState ): Promise<ConsoleSession> { if (!this.isInitialized) { await this.initialize(); } const socketOptions = options as UnixSocketConnectionOptions; // Validate required socket parameters if (!socketOptions.socketPath) { throw new Error('Socket path is required for Unix Socket protocol'); } // Create socket based on mode let socket: net.Socket | net.Server; if (socketOptions.socketMode === 'server') { socket = await this.createUnixServer(sessionId, socketOptions); } else { socket = await this.createUnixClient(sessionId, socketOptions); } // Store the socket this.sockets.set(sessionId, socket); this.socketStates.set(sessionId, 'connecting'); // Create session object const session: ConsoleSession = { id: sessionId, command: 'unix-socket', args: [socketOptions.socketPath], cwd: options.cwd || process.cwd(), env: { ...process.env, ...options.env }, createdAt: new Date(), lastActivity: new Date(), status: 'running', type: this.type, streaming: options.streaming, executionState: 'idle', activeCommands: new Map(), }; this.sessions.set(sessionId, session); this.logger.info( `Unix socket session ${sessionId} created for path ${socketOptions.socketPath}` ); this.emit('session-created', { sessionId, type: 'unix-socket', session }); return session; } // Override getOutput to satisfy old ProtocolFactory interface (returns string) async getOutput(sessionId: string, since?: Date): Promise<any> { const outputs = await super.getOutput(sessionId, since); return outputs.map((output) => output.data).join(''); } // Missing IProtocol methods for compatibility getAllSessions(): ConsoleSession[] { return Array.from(this.sessions.values()); } getActiveSessions(): ConsoleSession[] { return Array.from(this.sessions.values()).filter( (session) => session.status === 'running' ); } getSessionCount(): number { return this.sessions.size; } async getSessionState(sessionId: string): Promise<SessionState> { const session = this.sessions.get(sessionId); if (!session) { throw new Error(`Session ${sessionId} not found`); } return { sessionId, status: session.status, isOneShot: false, // Unix socket sessions are typically persistent isPersistent: true, createdAt: session.createdAt, lastActivity: session.lastActivity, metadata: { socketState: this.socketStates.get(sessionId) || 'unknown', }, }; } async handleError( error: Error, context: ErrorContext ): Promise<ErrorRecoveryResult> { this.logger.error( `Error in Unix socket session ${context.sessionId}: ${error.message}` ); return { recovered: false, strategy: 'none', attempts: 0, duration: 0, error: error.message, }; } async recoverSession(sessionId: string): Promise<boolean> { const socket = this.sockets.get(sessionId); const state = this.socketStates.get(sessionId); return (socket && state === 'connected') || false; } getResourceUsage(): ResourceUsage { const memUsage = process.memoryUsage(); const cpuUsage = process.cpuUsage(); return { memory: { used: memUsage.heapUsed, available: memUsage.heapTotal, peak: memUsage.heapTotal, }, cpu: { usage: cpuUsage.user + cpuUsage.system, load: [0, 0, 0], }, network: { bytesIn: 0, bytesOut: 0, connectionsActive: this.sockets.size, }, storage: { bytesRead: 0, bytesWritten: 0, }, sessions: { active: this.sessions.size, total: this.sessions.size, peak: this.sessions.size, }, }; } async getHealthStatus(): Promise<ProtocolHealthStatus> { const baseStatus = await super.getHealthStatus(); try { await this.checkUnixSocketAvailability(); return { ...baseStatus, dependencies: { 'unix-sockets': { available: true }, }, }; } catch (error) { return { ...baseStatus, isHealthy: false, errors: [...baseStatus.errors, `Unix sockets not available: ${error}`], dependencies: { 'unix-sockets': { available: false }, }, }; } } private async checkUnixSocketAvailability(): Promise<void> { return new Promise((resolve, reject) => { // Check if we're on a Unix-like platform if (process.platform === 'win32') { reject(new Error('Unix domain sockets are not supported on Windows')); return; } // Try to create a temporary Unix socket to test availability const testPath = path.join('/tmp', `test-socket-${Date.now()}`); const testServer = net.createServer(); testServer.listen(testPath, () => { testServer.close(() => { // Clean up test socket file try { fs.unlinkSync(testPath); } catch (e) { // Ignore cleanup errors } resolve(); }); }); testServer.on('error', (error) => { reject(new Error(`Unix socket test failed: ${error.message}`)); }); }); } private async createUnixServer( sessionId: string, options: UnixSocketConnectionOptions ): Promise<net.Server> { return new Promise((resolve, reject) => { const server = net.createServer((socket) => { this.setupSocketHandlers(sessionId, socket, options); }); // Remove existing socket file if it exists if (fs.existsSync(options.socketPath)) { fs.unlinkSync(options.socketPath); } server.listen(options.socketPath, () => { this.socketStates.set(sessionId, 'connected'); this.logger.info( `Unix socket server listening on ${options.socketPath}` ); // Set socket permissions if specified if (options.socketMode) { fs.chmodSync(options.socketPath, options.socketMode); } resolve(server); }); server.on('error', (error) => { this.socketStates.set(sessionId, 'error'); this.logger.error(`Unix socket server error: ${error.message}`); reject(error); }); }); } private async createUnixClient( sessionId: string, options: UnixSocketConnectionOptions ): Promise<net.Socket> { return new Promise((resolve, reject) => { const socket = net.createConnection(options.socketPath); socket.on('connect', () => { this.socketStates.set(sessionId, 'connected'); this.logger.info( `Unix socket client connected to ${options.socketPath}` ); this.setupSocketHandlers(sessionId, socket, options); resolve(socket); }); socket.on('error', (error) => { this.socketStates.set(sessionId, 'error'); this.logger.error(`Unix socket client error: ${error.message}`); reject(error); }); // Set connection timeout if specified if (options.connectTimeout) { socket.setTimeout(options.connectTimeout, () => { socket.destroy(); reject( new Error(`Connection timeout after ${options.connectTimeout}ms`) ); }); } }); } private setupSocketHandlers( sessionId: string, socket: net.Socket, options: UnixSocketConnectionOptions ): void { // Set up data handling socket.on('data', (data) => { const output: ConsoleOutput = { sessionId, type: 'stdout', data: data.toString(options.encoding || 'utf-8'), timestamp: new Date(), }; this.addToOutputBuffer(sessionId, output); }); socket.on('error', (error) => { this.logger.error(`Unix socket error for session ${sessionId}:`, error); this.socketStates.set(sessionId, 'error'); this.emit('session-error', { sessionId, error }); }); socket.on('close', () => { this.logger.info(`Unix socket closed for session ${sessionId}`); this.socketStates.set(sessionId, 'disconnected'); this.markSessionComplete(sessionId, 0); }); socket.on('end', () => { this.logger.info(`Unix socket ended for session ${sessionId}`); }); // Set socket options if (options.allowHalfOpen !== undefined) { socket.allowHalfOpen = options.allowHalfOpen; } if (options.enableSocketKeepAlive && options.socketKeepAliveDelay) { socket.setKeepAlive(true, options.socketKeepAliveDelay); } if (options.enableNodelay) { socket.setNoDelay(true); } } async cleanup(): Promise<void> { this.logger.info('Cleaning up Unix Socket protocol'); // Close all sockets for (const [sessionId, socket] of Array.from(this.sockets)) { try { if (socket instanceof net.Socket) { socket.end(); socket.destroy(); } else if (socket instanceof net.Server) { socket.close(); } } catch (error) { this.logger.error( `Error closing Unix socket for session ${sessionId}:`, error ); } } // Clear all data this.sockets.clear(); this.socketStates.clear(); // Call parent cleanup await super.cleanup(); } } export default UnixSocketProtocol;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server