async def _wait_for_completion(self, prompt_id: str, timeout: float = 600.0) -> dict:
"""Wait for the prompt to complete via WebSocket."""
print(f"[ComfyUI] Waiting for completion of prompt {prompt_id}", file=sys.stderr)
print(f"[ComfyUI] Connecting to WebSocket: {self.ws_url}", file=sys.stderr)
try:
async with websockets.connect(self.ws_url) as ws:
print(f"[ComfyUI] WebSocket connected successfully", file=sys.stderr)
loop = asyncio.get_running_loop()
start_time = loop.time()
while True:
if loop.time() - start_time > timeout:
raise TimeoutError(f"Generation timed out after {timeout}s")
try:
message = await asyncio.wait_for(ws.recv(), timeout=5.0)
data = json.loads(message)
# Debug: log all messages
print(f"[ComfyUI] WS message: {data.get('type')} for {data.get('data', {}).get('prompt_id')}", file=sys.stderr)
print(f"[ComfyUI] Full WS data: {data}", file=sys.stderr)
if data.get("type") == "executing":
exec_data = data.get("data", {})
if exec_data.get("prompt_id") == prompt_id:
if exec_data.get("node") is None:
# Execution complete
print(f"[ComfyUI] Prompt {prompt_id} completed, fetching history", file=sys.stderr)
break
elif data.get("type") == "execution_error":
error_data = data.get("data", {})
if error_data.get("prompt_id") == prompt_id:
raise RuntimeError(f"ComfyUI error: {error_data}")
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"[ComfyUI] WebSocket error: {e}", file=sys.stderr)
raise
# Get the history to find the output
print(f"[ComfyUI] Fetching history for {prompt_id}", file=sys.stderr)
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(f"{self.base_url}/history/{prompt_id}")
response.raise_for_status()
history = response.json()
print(f"[ComfyUI] History fetched, keys: {list(history.keys())}", file=sys.stderr)
return history
except Exception as e:
print(f"[ComfyUI] WebSocket connection failed: {e}", file=sys.stderr)
raise