from __future__ import annotations
import inspect
import os
import sys
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from .workflow_utils import normalize_workflow, apply_dryrun_overrides
@dataclass(frozen=True)
class NodeSchema:
node_type: str
input: Dict[str, Any]
output: Any
output_is_list: Any
output_name: Any
category: Any
description: Any
class ComfyImportError(RuntimeError):
pass
class ComfyBridge:
"""
Direct-import bridge into a local ComfyUI checkout.
Loads:
- nodes.NODE_CLASS_MAPPINGS
- node classes' INPUT_TYPES(), RETURN_TYPES, OUTPUT_NODE, etc.
- execution.validate_prompt (for prompt validation)
Optional:
- execution via internal executor (best-effort; disabled by default)
"""
def __init__(self, comfyui_root: str, enable_execute: bool = False) -> None:
self.comfyui_root = os.path.abspath(comfyui_root)
self.enable_execute = enable_execute
self._nodes_mod = None
self._execution_mod = None
self._server_mod = None
self._ensure_import_path()
self._import_comfy()
def _ensure_import_path(self) -> None:
if not os.path.isdir(self.comfyui_root):
raise ComfyImportError(f"COMFYUI_ROOT not found or not a directory: {self.comfyui_root}")
if self.comfyui_root not in sys.path:
sys.path.insert(0, self.comfyui_root)
def _import_comfy(self) -> None:
try:
import nodes # type: ignore
import execution # type: ignore
except Exception as e:
raise ComfyImportError(
"Failed importing ComfyUI modules. Ensure COMFYUI_ROOT points to the ComfyUI repo "
"and your Python env has ComfyUI deps installed."
) from e
# Best-effort: load custom nodes if ComfyUI exposes init hook.
try:
if hasattr(nodes, "init_extra_nodes") and callable(nodes.init_extra_nodes):
import asyncio
coro = nodes.init_extra_nodes()
# Handle both sync and async versions
if asyncio.iscoroutine(coro):
asyncio.get_event_loop().run_until_complete(coro)
except Exception:
# Custom node init can fail; keep core usable.
pass
self._nodes_mod = nodes
self._execution_mod = execution
# server.py can be import-heavy; keep optional.
try:
import server # type: ignore
self._server_mod = server
except Exception:
self._server_mod = None
@property
def nodes(self):
if self._nodes_mod is None:
raise ComfyImportError("ComfyUI nodes not loaded.")
return self._nodes_mod
@property
def execution(self):
if self._execution_mod is None:
raise ComfyImportError("ComfyUI execution not loaded.")
return self._execution_mod
def list_nodes(self) -> List[str]:
mappings = getattr(self.nodes, "NODE_CLASS_MAPPINGS", None)
if not isinstance(mappings, dict):
return []
return sorted(mappings.keys(), key=str.lower)
def node_schema(self, node_type: str) -> NodeSchema:
mappings = getattr(self.nodes, "NODE_CLASS_MAPPINGS", None)
if not isinstance(mappings, dict) or node_type not in mappings:
raise KeyError(f"Unknown node_type: {node_type}")
cls = mappings[node_type]
input_types = {}
try:
if hasattr(cls, "INPUT_TYPES") and callable(cls.INPUT_TYPES):
input_types = cls.INPUT_TYPES()
except Exception:
input_types = {}
output = getattr(cls, "RETURN_TYPES", None)
output_is_list = getattr(cls, "OUTPUT_IS_LIST", None)
output_name = getattr(cls, "RETURN_NAMES", None)
category = getattr(cls, "CATEGORY", None)
description = getattr(cls, "DESCRIPTION", None)
return NodeSchema(
node_type=node_type,
input=input_types,
output=output,
output_is_list=output_is_list,
output_name=output_name,
category=category,
description=description,
)
def _call_validate_prompt(self, prompt: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]:
"""
ComfyUI validate_prompt signature varies across versions.
We introspect and pass what we can.
Expected output (typical):
(ok: bool, error: dict, good_outputs: list, node_errors: dict)
We'll normalize into:
ok bool
details dict (including error/node_errors if present)
"""
vp = getattr(self.execution, "validate_prompt", None)
if not callable(vp):
raise RuntimeError("ComfyUI execution.validate_prompt not found.")
sig = inspect.signature(vp)
kwargs: Dict[str, Any] = {}
# Most common parameter names observed:
# - prompt / prompt_json / prompt_data
# - node_ids / outputs / extra_data (varies)
for name in sig.parameters.keys():
if name in ("prompt", "prompt_json", "prompt_data", "workflow", "graph", "p"):
kwargs[name] = prompt
if not kwargs:
# Fallback: first positional argument is prompt
try:
res = vp(prompt)
except TypeError as e:
raise RuntimeError(f"validate_prompt invocation failed: {e}") from e
else:
res = vp(**kwargs)
# Normalize result
details: Dict[str, Any] = {"raw_result_type": str(type(res))}
ok = False
if isinstance(res, tuple) and len(res) >= 2:
ok = bool(res[0])
details["error"] = res[1]
if len(res) >= 3:
details["good_outputs"] = res[2]
if len(res) >= 4:
details["node_errors"] = res[3]
elif isinstance(res, dict):
# Some forks return dict
ok = bool(res.get("ok", False))
details.update(res)
else:
details["raw_result"] = repr(res)
return ok, details
def validate_workflow(self, workflow_json: Any) -> Dict[str, Any]:
prompt = normalize_workflow(workflow_json)
ok, details = self._call_validate_prompt(prompt)
return {"ok": ok, "details": details}
def dry_run(self, workflow_json: Any, *, steps: Optional[int] = 2) -> Dict[str, Any]:
"""
Optional execution attempt.
Default behavior:
- If COMFYUI_ENABLE_EXECUTE != 1: return disabled
- Else: best-effort run by importing ComfyUI internals.
This may load models and use GPU/CPU.
The "dry_run" name is aspirational: ComfyUI has no native dry-run flag.
"""
if not self.enable_execute:
return {"ok": False, "disabled": True, "message": "Execution disabled. Set COMFYUI_ENABLE_EXECUTE=1."}
prompt = normalize_workflow(workflow_json)
prompt = apply_dryrun_overrides(prompt, steps=steps)
ok, details = self._call_validate_prompt(prompt)
if not ok:
return {"ok": False, "validated": False, "details": details}
# Best-effort: attempt to execute prompt via internal API if available.
# This is intentionally conservative; ComfyUI internals change often.
try:
# Common internal pieces (may not exist depending on version):
# - execution.PromptExecutor
# - execution.PromptQueue / server.PromptServer
executor_cls = getattr(self.execution, "PromptExecutor", None)
if executor_cls is None:
return {
"ok": True,
"validated": True,
"executed": False,
"message": "Validated OK. No PromptExecutor available in this ComfyUI version.",
}
executor = executor_cls()
# Some versions expect (prompt, extra_data, client_id) or similar.
# We'll try a couple patterns.
executed = False
last_err: Optional[str] = None
for attempt in range(3):
try:
if attempt == 0:
# Most minimal
executor.execute(prompt) # type: ignore
executed = True
break
if attempt == 1:
executor.execute(prompt, {}) # type: ignore
executed = True
break
if attempt == 2:
executor.execute(prompt, {}, "mcp") # type: ignore
executed = True
break
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
if not executed:
return {
"ok": True,
"validated": True,
"executed": False,
"message": "Validated OK. Execution attempt failed (best-effort).",
"execution_error": last_err,
}
return {"ok": True, "validated": True, "executed": True}
except Exception as e:
return {
"ok": True,
"validated": True,
"executed": False,
"message": "Validated OK. Execution failed.",
"execution_error": f"{type(e).__name__}: {e}",
}