import { EventEmitter } from 'events';
import { Socket, createServer, Server, connect } from 'net';
import { platform } from 'os';
import {
createReadStream,
createWriteStream,
unlink,
access,
chown,
chmod,
} from 'fs';
import { promisify } from 'util';
import { randomBytes, createCipher, createDecipher, createHash } from 'crypto';
import { gzip, gunzip, deflate, inflate } from 'zlib';
import * as path from 'path';
import { spawn, ChildProcess } from 'child_process';
import {
IPCConnectionOptions,
IPCSessionState,
IPCMessage,
ConsoleSession,
ConsoleOutput,
} from '../types/index.js';
import { Logger } from '../utils/logger.js';
const unlinkAsync = promisify(unlink);
const accessAsync = promisify(access);
const chownAsync = promisify(chown);
const chmodAsync = promisify(chmod);
const gzipAsync = promisify(gzip);
const gunzipAsync = promisify(gunzip);
const deflateAsync = promisify(deflate);
const inflateAsync = promisify(inflate);
/**
* Production-ready IPC Protocol implementation supporting:
* - Windows Named Pipes
* - Unix Domain Sockets
* - Docker Socket integration
* - Windows Mailslots
* - D-Bus integration
* - COM integration
* - Message framing and protocols
* - Bidirectional communication
* - Stream and datagram modes
*/
export class IPCProtocol extends EventEmitter {
private logger: Logger;
private options: IPCConnectionOptions;
private sessionState: IPCSessionState;
private socket?: Socket;
private server?: Server;
private isServer: boolean = false;
private messageQueue: Map<string, IPCMessage> = new Map();
private connectionAttempts: number = 0;
private reconnectTimer?: NodeJS.Timeout;
private keepAliveTimer?: NodeJS.Timeout;
private messageBuffer: Buffer = Buffer.alloc(0);
private pendingMessages: IPCMessage[] = [];
// Windows-specific handles
private namedPipeHandle?: any;
private mailslotHandle?: any;
// Unix-specific file descriptors
private unixSocketFd?: number;
// Protocol-specific clients
private dbusConnection?: any;
private comInterface?: any;
private dockerClient?: any;
constructor(options: IPCConnectionOptions) {
super();
this.options = options;
this.logger = new Logger('IPCProtocol');
this.sessionState = {
sessionId: this.generateSessionId(),
connectionState: 'disconnected',
ipcType: this.detectIPCType(),
endpoint: options.path,
connectionInfo: {
protocol: this.getProtocolName(),
established: new Date(),
lastActivity: new Date(),
},
statistics: {
messagesReceived: 0,
messagesSent: 0,
bytesReceived: 0,
bytesSent: 0,
errors: 0,
reconnections: 0,
},
};
this.setupErrorHandling();
}
private generateSessionId(): string {
return `ipc-${Date.now()}-${randomBytes(8).toString('hex')}`;
}
private detectIPCType():
| 'named-pipe'
| 'unix-socket'
| 'docker-socket'
| 'mailslot'
| 'dbus'
| 'com' {
if (this.options.namedPipe) return 'named-pipe';
if (this.options.unixSocket) return 'unix-socket';
if (this.options.dockerSocket) return 'docker-socket';
if (this.options.mailslot) return 'mailslot';
if (this.options.dbus) return 'dbus';
if (this.options.com) return 'com';
// Auto-detect based on path
if (platform() === 'win32') {
if (
this.options.path.includes('\\pipe\\') ||
this.options.path.startsWith('\\\\.\\pipe\\')
) {
return 'named-pipe';
}
if (
this.options.path.includes('\\mailslot\\') ||
this.options.path.startsWith('\\\\.\\mailslot\\')
) {
return 'mailslot';
}
} else {
if (
this.options.path.includes('/var/run/docker.sock') ||
this.options.path.includes('docker')
) {
return 'docker-socket';
}
return 'unix-socket';
}
return 'unix-socket';
}
private getProtocolName(): string {
const type = this.sessionState.ipcType;
const mode = this.options.mode || 'stream';
return `${type}-${mode}`;
}
private setupErrorHandling(): void {
this.on('error', (error) => {
this.logger.error('IPC Protocol error:', error);
this.sessionState.statistics.errors++;
this.sessionState.statistics.lastError = error.message;
this.updateLastActivity();
});
}
private updateLastActivity(): void {
this.sessionState.connectionInfo.lastActivity = new Date();
}
/**
* Connect to IPC endpoint
*/
async connect(): Promise<void> {
try {
this.sessionState.connectionState = 'connecting';
this.connectionAttempts++;
switch (this.sessionState.ipcType) {
case 'named-pipe':
await this.connectNamedPipe();
break;
case 'unix-socket':
await this.connectUnixSocket();
break;
case 'docker-socket':
await this.connectDockerSocket();
break;
case 'mailslot':
await this.connectMailslot();
break;
case 'dbus':
await this.connectDBus();
break;
case 'com':
await this.connectCOM();
break;
default:
throw new Error(`Unsupported IPC type: ${this.sessionState.ipcType}`);
}
this.sessionState.connectionState = 'connected';
this.sessionState.connectionInfo.established = new Date();
this.updateLastActivity();
this.setupKeepAlive();
this.processPendingMessages();
this.emit('connected', this.sessionState);
this.logger.info(
`Connected to ${this.sessionState.ipcType} at ${this.options.path}`
);
} catch (error) {
this.sessionState.connectionState = 'error';
this.sessionState.statistics.lastError = (error as Error).message;
this.logger.error(
`Failed to connect to ${this.sessionState.ipcType}:`,
error
);
if (
this.options.reconnect &&
this.connectionAttempts < (this.options.maxReconnectAttempts || 5)
) {
await this.scheduleReconnect();
} else {
throw error;
}
}
}
/**
* Windows Named Pipes connection
*/
private async connectNamedPipe(): Promise<void> {
if (platform() !== 'win32') {
throw new Error('Named Pipes are only supported on Windows');
}
const pipeOptions = this.options.namedPipe!;
const pipeName = this.formatNamedPipePath(pipeOptions.pipeName);
try {
// Use Node.js net module to connect to named pipe
this.socket = connect(pipeName);
this.socket.on('connect', () => {
this.logger.info(`Connected to Named Pipe: ${pipeName}`);
this.sessionState.namedPipeState = {
serverMode: false,
clientCount: 1,
instanceId: 1,
};
});
this.socket.on('data', (data) => {
this.handleIncomingData(data);
});
this.socket.on('error', (error) => {
this.emit('error', error);
});
this.socket.on('close', () => {
this.handleDisconnection();
});
// Wait for connection
await new Promise<void>((resolve, reject) => {
this.socket!.on('connect', resolve);
this.socket!.on('error', reject);
setTimeout(() => {
reject(new Error('Named Pipe connection timeout'));
}, pipeOptions.timeout || 30000);
});
} catch (error) {
throw new Error(
`Failed to connect to Named Pipe ${pipeName}: ${(error as Error).message}`
);
}
}
/**
* Unix Domain Sockets connection
*/
private async connectUnixSocket(): Promise<void> {
if (platform() === 'win32') {
throw new Error(
'Unix Domain Sockets are not natively supported on Windows'
);
}
const unixOptions = this.options.unixSocket!;
let socketPath = unixOptions.socketPath || this.options.path;
// Handle abstract namespace on Linux
if (unixOptions.abstract && platform() === 'linux') {
socketPath = '\x00' + socketPath; // Abstract namespace prefix
}
try {
this.socket = connect(socketPath);
this.socket.on('connect', () => {
this.logger.info(`Connected to Unix Domain Socket: ${socketPath}`);
this.sessionState.unixSocketState = {
socketType: unixOptions.socketType || 'stream',
abstract: unixOptions.abstract || false,
permissions: unixOptions.permissions || '0755',
};
});
this.socket.on('data', (data) => {
this.handleIncomingData(data);
});
this.socket.on('error', (error) => {
this.emit('error', error);
});
this.socket.on('close', () => {
this.handleDisconnection();
});
// Wait for connection
await new Promise<void>((resolve, reject) => {
this.socket!.on('connect', resolve);
this.socket!.on('error', reject);
setTimeout(() => {
reject(new Error('Unix socket connection timeout'));
}, 30000);
});
} catch (error) {
throw new Error(
`Failed to connect to Unix Domain Socket ${socketPath}: ${(error as Error).message}`
);
}
}
/**
* Docker Socket connection
*/
private async connectDockerSocket(): Promise<void> {
const dockerOptions = this.options.dockerSocket!;
const socketPath =
dockerOptions.socketPath || this.getDefaultDockerSocketPath();
try {
this.socket = connect(socketPath);
this.socket.on('connect', async () => {
this.logger.info(`Connected to Docker Socket: ${socketPath}`);
// Initialize Docker API connection
await this.initializeDockerAPI();
this.sessionState.dockerSocketState = {
apiVersion: dockerOptions.apiVersion || '1.41',
serverInfo: {},
containers: [],
};
});
this.socket.on('data', (data) => {
this.handleDockerData(data);
});
this.socket.on('error', (error) => {
this.emit('error', error);
});
this.socket.on('close', () => {
this.handleDisconnection();
});
// Wait for connection
await new Promise<void>((resolve, reject) => {
this.socket!.on('connect', resolve);
this.socket!.on('error', reject);
setTimeout(() => {
reject(new Error('Docker socket connection timeout'));
}, dockerOptions.timeout || 30000);
});
} catch (error) {
throw new Error(
`Failed to connect to Docker Socket ${socketPath}: ${(error as Error).message}`
);
}
}
/**
* Windows Mailslots connection
*/
private async connectMailslot(): Promise<void> {
if (platform() !== 'win32') {
throw new Error('Mailslots are only supported on Windows');
}
const mailslotOptions = this.options.mailslot!;
const mailslotName = this.formatMailslotPath(mailslotOptions.mailslotName);
try {
// For now, simulate mailslot connection using named pipes
// In a full implementation, this would use Windows API directly
this.socket = connect(mailslotName);
this.socket.on('connect', () => {
this.logger.info(`Connected to Mailslot: ${mailslotName}`);
});
this.socket.on('data', (data) => {
this.handleIncomingData(data);
});
this.socket.on('error', (error) => {
this.emit('error', error);
});
await new Promise<void>((resolve, reject) => {
this.socket!.on('connect', resolve);
this.socket!.on('error', reject);
setTimeout(() => {
reject(new Error('Mailslot connection timeout'));
}, mailslotOptions.readTimeout || 30000);
});
} catch (error) {
throw new Error(
`Failed to connect to Mailslot ${mailslotName}: ${(error as Error).message}`
);
}
}
/**
* D-Bus connection (Linux)
*/
private async connectDBus(): Promise<void> {
if (platform() === 'win32') {
throw new Error('D-Bus is not available on Windows');
}
const dbusOptions = this.options.dbus!;
try {
// Simulate D-Bus connection - in production would use dbus library
const busAddress =
dbusOptions.busAddress ||
this.getDBusAddress(dbusOptions.busType || 'session');
// Use Unix socket to connect to D-Bus
this.socket = connect(busAddress);
this.socket.on('connect', () => {
this.logger.info(
`Connected to D-Bus: ${dbusOptions.busType || 'session'}`
);
this.sessionState.dbusState = {
busType: dbusOptions.busType || 'session',
serviceName: dbusOptions.serviceName,
ownedNames: [],
};
});
this.socket.on('data', (data) => {
this.handleDBusMessage(data);
});
this.socket.on('error', (error) => {
this.emit('error', error);
});
await new Promise<void>((resolve, reject) => {
this.socket!.on('connect', resolve);
this.socket!.on('error', reject);
setTimeout(() => {
reject(new Error('D-Bus connection timeout'));
}, dbusOptions.timeout || 30000);
});
} catch (error) {
throw new Error(
`Failed to connect to D-Bus: ${(error as Error).message}`
);
}
}
/**
* COM connection (Windows)
*/
private async connectCOM(): Promise<void> {
if (platform() !== 'win32') {
throw new Error('COM is only available on Windows');
}
const comOptions = this.options.com!;
try {
// Simulate COM connection - in production would use Windows COM API
this.logger.info(
`Connecting to COM object: ${comOptions.progId || comOptions.clsid}`
);
// Use spawn to create a COM automation bridge process
const comProcess = spawn('cscript', ['/nologo', '-'], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// Write VBScript to interact with COM object
const vbScript = this.generateCOMScript(comOptions);
comProcess.stdin.write(vbScript);
comProcess.stdin.end();
comProcess.stdout.on('data', (data) => {
this.handleCOMMessage(data);
});
comProcess.stderr.on('data', (data) => {
this.emit('error', new Error(`COM Error: ${data.toString()}`));
});
this.sessionState.comState = {
progId: comOptions.progId,
clsid: comOptions.clsid,
threadingModel: 'apartment',
};
this.sessionState.connectionState = 'connected';
} catch (error) {
throw new Error(
`Failed to connect to COM object: ${(error as Error).message}`
);
}
}
/**
* Create IPC server
*/
async createServer(): Promise<void> {
this.isServer = true;
this.sessionState.connectionState = 'connecting';
switch (this.sessionState.ipcType) {
case 'named-pipe':
await this.createNamedPipeServer();
break;
case 'unix-socket':
await this.createUnixSocketServer();
break;
default:
throw new Error(
`Server mode not supported for ${this.sessionState.ipcType}`
);
}
this.sessionState.connectionState = 'ready';
this.emit('server-ready', this.sessionState);
this.logger.info(`IPC Server listening on ${this.options.path}`);
}
private async createNamedPipeServer(): Promise<void> {
if (platform() !== 'win32') {
throw new Error('Named Pipes are only supported on Windows');
}
const pipeOptions = this.options.namedPipe!;
const pipeName = this.formatNamedPipePath(pipeOptions.pipeName);
this.server = createServer();
this.server.on('connection', (socket) => {
this.handleClientConnection(socket);
});
this.server.on('error', (error) => {
this.emit('error', error);
});
await new Promise<void>((resolve, reject) => {
this.server!.listen(pipeName, () => {
resolve();
});
this.server!.on('error', reject);
});
}
private async createUnixSocketServer(): Promise<void> {
if (platform() === 'win32') {
throw new Error(
'Unix Domain Sockets are not natively supported on Windows'
);
}
const unixOptions = this.options.unixSocket!;
const socketPath = unixOptions.socketPath || this.options.path;
// Remove existing socket file if requested
if (unixOptions.unlink) {
try {
await unlinkAsync(socketPath);
} catch (error) {
// Ignore if file doesn't exist
}
}
this.server = createServer();
this.server.on('connection', (socket) => {
this.handleClientConnection(socket);
});
this.server.on('error', (error) => {
this.emit('error', error);
});
await new Promise<void>((resolve, reject) => {
this.server!.listen(socketPath, () => {
resolve();
});
this.server!.on('error', reject);
});
// Set permissions if specified
if (unixOptions.permissions) {
await chmodAsync(socketPath, unixOptions.permissions);
}
if (unixOptions.uid !== undefined && unixOptions.gid !== undefined) {
await chownAsync(socketPath, unixOptions.uid, unixOptions.gid);
}
}
private handleClientConnection(socket: Socket): void {
const clientId = this.generateClientId();
this.logger.info(`Client connected: ${clientId}`);
socket.on('data', (data) => {
this.handleIncomingData(data, clientId);
});
socket.on('error', (error) => {
this.logger.error(`Client error ${clientId}:`, error);
});
socket.on('close', () => {
this.logger.info(`Client disconnected: ${clientId}`);
});
this.emit('client-connected', { clientId, socket });
}
private generateClientId(): string {
return `client-${Date.now()}-${randomBytes(4).toString('hex')}`;
}
/**
* Send message over IPC
*/
async sendMessage(
payload: any,
options?: Partial<IPCMessage>
): Promise<void> {
const message: IPCMessage = {
id: this.generateMessageId(),
type: 'command',
timestamp: new Date(),
sessionId: this.sessionState.sessionId,
payload,
encoding: 'utf8',
priority: 'normal',
...options,
};
if (
this.sessionState.connectionState !== 'connected' &&
this.sessionState.connectionState !== 'ready'
) {
this.pendingMessages.push(message);
return;
}
try {
const serializedMessage = await this.serializeMessage(message);
const framedMessage = await this.frameMessage(serializedMessage);
await this.sendRawData(framedMessage);
this.sessionState.statistics.messagesSent++;
this.sessionState.statistics.bytesSent += framedMessage.length;
this.updateLastActivity();
this.emit('message-sent', message);
} catch (error) {
this.sessionState.statistics.errors++;
this.emit('error', error);
throw error;
}
}
/**
* Broadcast message (for mailslots and other broadcast protocols)
*/
async broadcastMessage(
payload: any,
options?: Partial<IPCMessage>
): Promise<void> {
const message: IPCMessage = {
id: this.generateMessageId(),
type: 'broadcast',
timestamp: new Date(),
sessionId: this.sessionState.sessionId,
payload,
encoding: 'utf8',
priority: 'normal',
...options,
};
if (this.sessionState.ipcType === 'mailslot') {
await this.sendMailslotBroadcast(message);
} else {
await this.sendMessage(payload, options);
}
}
private async sendMailslotBroadcast(message: IPCMessage): Promise<void> {
// Implementation would use Windows Mailslot API for true broadcast
const serializedMessage = await this.serializeMessage(message);
const framedMessage = await this.frameMessage(serializedMessage);
await this.sendRawData(framedMessage);
this.sessionState.statistics.messagesSent++;
this.sessionState.statistics.bytesSent += framedMessage.length;
this.updateLastActivity();
this.emit('broadcast-sent', message);
}
/**
* Handle incoming data
*/
private async handleIncomingData(
data: Buffer,
clientId?: string
): Promise<void> {
this.messageBuffer = Buffer.concat([this.messageBuffer, data]);
this.sessionState.statistics.bytesReceived += data.length;
this.updateLastActivity();
try {
const messages = await this.extractMessages();
for (const message of messages) {
const deserializedMessage = await this.deserializeMessage(message);
this.sessionState.statistics.messagesReceived++;
this.emit('message-received', deserializedMessage, clientId);
// Handle response messages
if (
deserializedMessage.type === 'response' &&
deserializedMessage.correlationId
) {
this.handleResponse(deserializedMessage);
}
}
} catch (error) {
this.sessionState.statistics.errors++;
this.emit('error', error);
}
}
private async handleDockerData(data: Buffer): Promise<void> {
// Parse Docker API response
const response = data.toString('utf8');
try {
// Handle HTTP response format
const [headers, body] = response.split('\r\n\r\n');
if (body) {
const jsonData = JSON.parse(body);
this.emit('docker-response', jsonData);
}
} catch (error) {
this.handleIncomingData(data);
}
}
private async handleDBusMessage(data: Buffer): Promise<void> {
// Parse D-Bus message format
// This would require proper D-Bus protocol implementation
this.handleIncomingData(data);
}
private async handleCOMMessage(data: Buffer): Promise<void> {
// Parse COM automation response
const response = data.toString('utf8').trim();
try {
const jsonData = JSON.parse(response);
this.emit('com-response', jsonData);
} catch (error) {
this.emit('com-output', response);
}
}
private handleResponse(message: IPCMessage): void {
const pendingMessage = this.messageQueue.get(message.correlationId!);
if (pendingMessage) {
this.messageQueue.delete(message.correlationId!);
this.emit('response-received', message, pendingMessage);
}
}
/**
* Message serialization and framing
*/
private async serializeMessage(message: IPCMessage): Promise<Buffer> {
let data = JSON.stringify(message);
let buffer = Buffer.from(data, message.encoding || 'utf8');
// Apply compression
if (
this.options.messageFraming?.compression &&
this.options.messageFraming.compression !== 'none'
) {
buffer = Buffer.from(
await this.compressData(buffer, this.options.messageFraming.compression)
);
message.compressed = true;
}
// Apply encryption
if (this.options.messageFraming?.encryption) {
buffer = Buffer.from(
await this.encryptData(buffer, this.options.messageFraming.encryption)
);
message.encrypted = true;
}
return buffer;
}
private async deserializeMessage(buffer: Buffer): Promise<IPCMessage> {
let data = buffer;
// Apply decryption
if (this.options.messageFraming?.encryption) {
data = await this.decryptData(
data,
this.options.messageFraming.encryption
);
}
// Apply decompression
if (
this.options.messageFraming?.compression &&
this.options.messageFraming.compression !== 'none'
) {
data = await this.decompressData(
data,
this.options.messageFraming.compression
);
}
const jsonString = data.toString('utf8');
return JSON.parse(jsonString) as IPCMessage;
}
private async frameMessage(data: Buffer): Promise<Buffer> {
const framingProtocol =
this.options.messageFraming?.protocol || 'length_prefixed';
switch (framingProtocol) {
case 'length_prefixed':
const lengthBytes = this.options.messageFraming?.lengthBytes || 4;
const lengthBuffer = Buffer.alloc(lengthBytes);
lengthBuffer.writeUIntLE(data.length, 0, lengthBytes);
return Buffer.concat([lengthBuffer, data]);
case 'delimiter':
const delimiter = this.options.messageFraming?.delimiter || '\n';
return Buffer.concat([data, Buffer.from(delimiter, 'utf8')]);
case 'json_lines':
return Buffer.concat([data, Buffer.from('\n', 'utf8')]);
case 'fixed_length':
// Pad or truncate to fixed length
const maxSize = this.options.messageFraming?.maxMessageSize || 1024;
if (data.length > maxSize) {
return data.slice(0, maxSize);
}
const paddedBuffer = Buffer.alloc(maxSize);
data.copy(paddedBuffer);
return paddedBuffer;
default:
return data;
}
}
private async extractMessages(): Promise<Buffer[]> {
const messages: Buffer[] = [];
const framingProtocol =
this.options.messageFraming?.protocol || 'length_prefixed';
switch (framingProtocol) {
case 'length_prefixed':
const lengthBytes = this.options.messageFraming?.lengthBytes || 4;
while (this.messageBuffer.length >= lengthBytes) {
const messageLength = this.messageBuffer.readUIntLE(0, lengthBytes);
if (this.messageBuffer.length >= lengthBytes + messageLength) {
const messageData = this.messageBuffer.slice(
lengthBytes,
lengthBytes + messageLength
);
messages.push(messageData);
this.messageBuffer = this.messageBuffer.slice(
lengthBytes + messageLength
);
} else {
break;
}
}
break;
case 'delimiter':
case 'json_lines':
const delimiter =
framingProtocol === 'json_lines'
? '\n'
: this.options.messageFraming?.delimiter || '\n';
const delimiterBuffer = Buffer.from(delimiter, 'utf8');
let delimiterIndex;
while (
(delimiterIndex = this.messageBuffer.indexOf(delimiterBuffer)) !== -1
) {
const messageData = this.messageBuffer.slice(0, delimiterIndex);
messages.push(messageData);
this.messageBuffer = this.messageBuffer.slice(
delimiterIndex + delimiterBuffer.length
);
}
break;
case 'fixed_length':
const fixedLength = this.options.messageFraming?.maxMessageSize || 1024;
while (this.messageBuffer.length >= fixedLength) {
const messageData = this.messageBuffer.slice(0, fixedLength);
messages.push(messageData);
this.messageBuffer = this.messageBuffer.slice(fixedLength);
}
break;
}
return messages;
}
/**
* Data compression and decompression
*/
private async compressData(data: Buffer, algorithm: string): Promise<Buffer> {
switch (algorithm) {
case 'gzip':
return await gzipAsync(data);
case 'deflate':
return await deflateAsync(data);
case 'brotli':
// Would use brotli compression in production
return data;
default:
return data;
}
}
private async decompressData(
data: Buffer,
algorithm: string
): Promise<Buffer> {
switch (algorithm) {
case 'gzip':
return await gunzipAsync(data);
case 'deflate':
return await inflateAsync(data);
case 'brotli':
// Would use brotli decompression in production
return data;
default:
return data;
}
}
/**
* Data encryption and decryption
*/
private async encryptData(data: Buffer, encryption: any): Promise<Buffer> {
const cipher = createCipher(encryption.algorithm, encryption.key);
let encrypted = cipher.update(data);
encrypted = Buffer.concat([encrypted, cipher.final()]);
return encrypted;
}
private async decryptData(data: Buffer, encryption: any): Promise<Buffer> {
const decipher = createDecipher(encryption.algorithm, encryption.key);
let decrypted = decipher.update(data);
decrypted = Buffer.concat([decrypted, decipher.final()]);
return decrypted;
}
/**
* Send raw data over transport
*/
private async sendRawData(data: Buffer): Promise<void> {
if (!this.socket) {
throw new Error('No active connection');
}
return new Promise<void>((resolve, reject) => {
this.socket!.write(data, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
/**
* Connection management
*/
private async scheduleReconnect(): Promise<void> {
const delay = this.options.reconnectInterval || 5000;
this.reconnectTimer = setTimeout(async () => {
try {
await this.connect();
} catch (error) {
this.logger.error('Reconnection failed:', error);
}
}, delay);
}
private setupKeepAlive(): void {
if (this.options.keepAlive && this.options.keepAliveInterval) {
this.keepAliveTimer = setInterval(() => {
this.sendKeepAlive();
}, this.options.keepAliveInterval);
}
}
private async sendKeepAlive(): Promise<void> {
try {
await this.sendMessage(
{ type: 'ping' },
{ type: 'event', priority: 'low' }
);
} catch (error) {
this.logger.error('Keep-alive failed:', error);
}
}
private handleDisconnection(): void {
this.sessionState.connectionState = 'disconnected';
this.emit('disconnected', this.sessionState);
if (this.keepAliveTimer) {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = undefined;
}
if (this.options.reconnect) {
this.scheduleReconnect();
}
}
/**
* Utility methods
*/
private formatNamedPipePath(pipeName: string): string {
if (pipeName.startsWith('\\\\.\\pipe\\')) {
return pipeName;
}
if (pipeName.startsWith('pipe\\')) {
return '\\\\.' + pipeName;
}
return `\\\\.\\pipe\\${pipeName}`;
}
private formatMailslotPath(mailslotName: string): string {
if (mailslotName.startsWith('\\\\.\\mailslot\\')) {
return mailslotName;
}
if (mailslotName.startsWith('mailslot\\')) {
return '\\\\.' + mailslotName;
}
return `\\\\.\\mailslot\\${mailslotName}`;
}
private getDefaultDockerSocketPath(): string {
return platform() === 'win32'
? '\\\\.\\pipe\\docker_engine'
: '/var/run/docker.sock';
}
private getDBusAddress(busType: string): string {
switch (busType) {
case 'system':
return '/var/run/dbus/system_bus_socket';
case 'session':
return process.env.DBUS_SESSION_BUS_ADDRESS || '/run/user/1000/bus';
default:
return '/var/run/dbus/system_bus_socket';
}
}
private async initializeDockerAPI(): Promise<void> {
// Send Docker version request
const versionRequest = 'GET /version HTTP/1.1\r\nHost: docker\r\n\r\n';
await this.sendRawData(Buffer.from(versionRequest, 'utf8'));
}
private generateCOMScript(comOptions: any): string {
return `
Set objApp = CreateObject("${comOptions.progId}")
' COM automation script would go here
WScript.Echo "{""status"": ""connected"", ""progId"": ""${comOptions.progId}""}"
`;
}
private generateMessageId(): string {
return `msg-${Date.now()}-${randomBytes(4).toString('hex')}`;
}
private async processPendingMessages(): Promise<void> {
const messages = [...this.pendingMessages];
this.pendingMessages = [];
for (const message of messages) {
try {
await this.sendMessage(message.payload, message);
} catch (error) {
this.logger.error('Failed to send pending message:', error);
this.pendingMessages.push(message); // Re-queue on failure
}
}
}
/**
* Get session state
*/
getSessionState(): IPCSessionState {
return { ...this.sessionState };
}
/**
* Get connection statistics
*/
getStatistics(): any {
return {
...this.sessionState.statistics,
connectionState: this.sessionState.connectionState,
uptime:
Date.now() - this.sessionState.connectionInfo.established.getTime(),
pendingMessages: this.pendingMessages.length,
queuedMessages: this.messageQueue.size,
};
}
/**
* Cleanup and disconnect
*/
async disconnect(): Promise<void> {
this.sessionState.connectionState = 'disconnected';
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
}
if (this.keepAliveTimer) {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = undefined;
}
if (this.socket) {
this.socket.destroy();
this.socket = undefined;
}
if (this.server) {
this.server.close();
this.server = undefined;
}
// Clean up protocol-specific resources
if (this.namedPipeHandle) {
// Close Windows handle
this.namedPipeHandle = undefined;
}
if (this.mailslotHandle) {
// Close mailslot handle
this.mailslotHandle = undefined;
}
if (this.dbusConnection) {
// Close D-Bus connection
this.dbusConnection = undefined;
}
if (this.comInterface) {
// Release COM interface
this.comInterface = undefined;
}
this.emit('disconnected', this.sessionState);
this.logger.info('IPC connection closed');
}
/**
* Destroy the protocol instance
*/
destroy(): void {
this.disconnect();
this.removeAllListeners();
}
}