import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';
import fetch from 'node-fetch';
import FormData from 'form-data';
import fs from 'fs';
import path from 'path';
import { ConnectionManager, ConnectionState } from './utils/connection-manager.js';
export class ComfyUIClient {
constructor(serverAddress = '127.0.0.1:8188', options = {}) {
this.serverAddress = serverAddress;
this.clientId = uuidv4();
this.ws = null;
// Connection manager for resilience
this.connectionManager = new ConnectionManager({
maxReconnectAttempts: options.maxReconnectAttempts || parseInt(process.env.COMFYUI_MAX_RECONNECT_ATTEMPTS) || 10,
reconnectDelay: options.reconnectDelay || parseInt(process.env.COMFYUI_RECONNECT_DELAY) || 1000,
maxReconnectDelay: options.maxReconnectDelay || parseInt(process.env.COMFYUI_MAX_RECONNECT_DELAY) || 30000,
heartbeatInterval: options.heartbeatInterval || parseInt(process.env.COMFYUI_HEARTBEAT_INTERVAL) || 30000,
enableAutoReconnect: options.enableAutoReconnect !== false
});
// Setup connection manager events
this.setupConnectionEvents();
// Track pending operations
this.pendingOperations = new Map();
}
setupConnectionEvents() {
this.connectionManager.on('stateChange', ({ from, to }) => {
if (to === ConnectionState.CONNECTED) {
console.log(`✅ ComfyUI connection established at ${this.serverAddress}`);
} else if (to === ConnectionState.FAILED) {
console.error(`❌ ComfyUI connection failed after ${this.connectionManager.config.maxReconnectAttempts} attempts`);
}
});
this.connectionManager.on('reconnecting', (attempt) => {
console.log(`🔄 Reconnecting to ComfyUI (attempt ${attempt})...`);
});
}
async connect() {
return this.connectionManager.connect(async () => {
return new Promise((resolve, reject) => {
// Clean up old WebSocket if exists
if (this.ws) {
this.ws.removeAllListeners();
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
}
this.ws = new WebSocket(`ws://${this.serverAddress}/ws?clientId=${this.clientId}`);
const onOpen = () => {
cleanup();
this.setupWebSocketHandlers();
resolve();
};
const onError = (error) => {
cleanup();
reject(error);
};
const cleanup = () => {
this.ws.removeListener('open', onOpen);
this.ws.removeListener('error', onError);
};
this.ws.once('open', onOpen);
this.ws.once('error', onError);
});
});
}
setupWebSocketHandlers() {
// Handle unexpected disconnections
this.ws.on('close', (code, reason) => {
console.warn(`WebSocket closed unexpectedly: ${code} - ${reason}`);
this.connectionManager.handleUnexpectedDisconnect(() => this.connect());
});
this.ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
// Setup ping/pong for health check
this.ws.on('pong', () => {
this.lastPongReceived = Date.now();
});
// Start health monitoring
this.startHealthCheck();
}
startHealthCheck() {
const checkHealth = async () => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
// Send ping
this.ws.ping();
// Check if we received pong recently
const timeSinceLastPong = Date.now() - (this.lastPongReceived || 0);
if (this.lastPongReceived && timeSinceLastPong > 60000) {
throw new Error('WebSocket connection stale (no pong received)');
}
};
this.connectionManager.startHealthCheck(checkHealth);
}
disconnect() {
this.connectionManager.disconnect();
if (this.ws) {
this.ws.removeAllListeners();
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
this.ws = null;
}
}
async ensureConnected() {
if (!this.connectionManager.isConnected) {
if (this.connectionManager.isConnecting) {
await this.connectionManager.waitForConnection();
} else {
await this.connect();
}
}
}
isConnected() {
return this.connectionManager.isConnected &&
this.ws &&
this.ws.readyState === WebSocket.OPEN;
}
getConnectionStatus() {
return this.connectionManager.getStatus();
}
async getHistory(promptId) {
await this.ensureConnected();
const response = await fetch(`http://${this.serverAddress}/history/${promptId}`);
return response.json();
}
async getImages(filename, subfolder, folderType) {
await this.ensureConnected();
const data = { filename, subfolder, type: folderType };
const urlValues = new URLSearchParams(data).toString();
const response = await fetch(`http://${this.serverAddress}/view?${urlValues}`);
return response.arrayBuffer();
}
async uploadImage(imageBuffer, filename, overwrite = false) {
await this.ensureConnected();
const formData = new FormData();
formData.append('image', imageBuffer, filename);
formData.append('overwrite', overwrite.toString());
const response = await fetch(`http://${this.serverAddress}/upload/image`, {
method: 'POST',
body: formData
});
return response.json();
}
async queuePrompt(prompt) {
await this.ensureConnected();
const p = { prompt, client_id: this.clientId };
const response = await fetch(`http://${this.serverAddress}/prompt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(p)
});
return response.json();
}
async interrupt() {
await this.ensureConnected();
const response = await fetch(`http://${this.serverAddress}/interrupt`, {
method: 'POST'
});
return response.json();
}
async waitForExecution(promptId) {
await this.ensureConnected();
return new Promise((resolve, reject) => {
const outputs = {};
let isExecuting = false;
let messageHandler = null;
// Store the operation in case we need to retry after reconnection
const operationId = `execution_${promptId}`;
this.pendingOperations.set(operationId, { promptId, resolve, reject });
const cleanup = () => {
if (this.ws && messageHandler) {
this.ws.off('message', messageHandler);
}
this.pendingOperations.delete(operationId);
};
messageHandler = (data) => {
try {
const message = JSON.parse(data.toString());
if (message.type === 'executing') {
const executionData = message.data;
if (executionData.node === null && executionData.prompt_id === promptId) {
// Execution completed
cleanup();
resolve(outputs);
} else if (executionData.prompt_id === promptId) {
isExecuting = true;
}
} else if (message.type === 'executed' && isExecuting) {
const msgData = message.data;
if (msgData.prompt_id === promptId) {
const nodeId = msgData.node;
outputs[nodeId] = msgData.output;
}
} else if (message.type === 'error' && message.data.prompt_id === promptId) {
cleanup();
reject(new Error(message.data.error));
}
} catch (error) {
console.error('Error handling WebSocket message:', error);
}
};
// Handle connection loss during execution
const handleDisconnect = () => {
console.warn(`Connection lost while waiting for execution ${promptId}`);
// The operation will be retried when connection is restored
};
this.connectionManager.once('unexpectedDisconnect', handleDisconnect);
// Set up message handler
if (this.ws) {
this.ws.on('message', messageHandler);
} else {
cleanup();
reject(new Error('WebSocket not available'));
}
});
}
async generateImage(workflow, outputDir = './output') {
// Use connection manager's operation queue for automatic retry
return this.connectionManager.queueOperation(async () => {
try {
// Ensure we're connected before starting
await this.ensureConnected();
// Queue the prompt
const { prompt_id } = await this.queuePrompt(workflow);
console.log('Prompt queued with ID:', prompt_id);
// Wait for execution to complete
const outputs = await this.waitForExecution(prompt_id);
// Get history to find the output images
const history = await this.getHistory(prompt_id);
if (!history[prompt_id]) {
throw new Error('No history found for prompt ID');
}
const images = [];
const historyOutputs = history[prompt_id].outputs;
for (const nodeId in historyOutputs) {
const nodeOutput = historyOutputs[nodeId];
if (nodeOutput.images) {
for (const image of nodeOutput.images) {
const imageData = await this.getImages(
image.filename,
image.subfolder,
image.type
);
// Save image locally
if (!fs.existsSync(outputDir)) {
fs.mkdirSync(outputDir, { recursive: true });
}
const outputPath = path.join(outputDir, image.filename);
fs.writeFileSync(outputPath, Buffer.from(imageData));
images.push({
filename: image.filename,
path: outputPath,
subfolder: image.subfolder,
type: image.type
});
}
}
}
return images;
} catch (error) {
console.error('Error generating image:', error.message);
throw error;
}
});
}
}