"""Workflow-related route functions for ComfyUI API."""
import httpx
from src.auth.base import ComfyAuth
from src.client import get_data as gd
from src.client.response import ResponseGetData
from src.utils.logging import log_call
class WorkflowError(Exception):
"""Raised when workflow operations fail."""
def __init__(self, message: str, response: ResponseGetData | None = None):
super().__init__(message)
self.response = response
@log_call(action_name="queue_workflow", level_name="route")
async def queue_workflow(
auth: ComfyAuth,
workflow: dict,
*,
session: httpx.AsyncClient | None = None,
) -> ResponseGetData:
"""Submit a workflow to ComfyUI for execution.
This route function queues a workflow in ComfyUI's execution queue and
returns a prompt ID for tracking. Minimal logic - just HTTP transport
and error handling.
Args:
auth: ComfyAuth instance for base URL and authentication
workflow: Workflow definition dictionary (nodes, connections, etc.)
session: Optional HTTPX client for connection pooling
Returns:
ResponseGetData with:
- response: {"prompt_id": "...", "number": ...}
- status: 200 on success
- is_success: True on success
Raises:
WorkflowError: If queueing fails (non-2xx response)
Example:
>>> auth = NoAuth("http://127.0.0.1:8188")
>>> res = await queue_workflow(
... auth=auth,
... workflow={"3": {"class_type": "KSampler", ...}},
... )
>>> print(res.response["prompt_id"])
"abc-123-def-456"
"""
url = f"{auth.base_url}/prompt"
res = await gd(
auth=auth,
method="POST",
url=url,
body={"prompt": workflow},
session=session,
)
if not res.is_success:
raise WorkflowError(
f"Failed to queue workflow: HTTP {res.status}",
response=res,
)
return res
@log_call(action_name="get_history", level_name="route")
async def get_history(
auth: ComfyAuth,
*,
session: httpx.AsyncClient | None = None,
) -> ResponseGetData:
"""Get full execution history from ComfyUI.
Returns all workflow executions with their status, outputs, and metadata.
Args:
auth: ComfyAuth instance for base URL and authentication
session: Optional HTTPX client for connection pooling
Returns:
ResponseGetData with:
- response: Dict mapping prompt_id -> execution details
- status: 200 on success
- is_success: True on success
Raises:
WorkflowError: If request fails
Example:
>>> auth = NoAuth("http://127.0.0.1:8188")
>>> res = await get_history(auth=auth)
>>> print(list(res.response.keys()))
["abc-123", "def-456", ...]
"""
url = f"{auth.base_url}/history"
res = await gd(
auth=auth,
method="GET",
url=url,
session=session,
)
if not res.is_success:
raise WorkflowError(
f"Failed to get history: HTTP {res.status}",
response=res,
)
return res
@log_call(action_name="get_prompt_history", level_name="route")
async def get_prompt_history(
auth: ComfyAuth,
prompt_id: str,
*,
session: httpx.AsyncClient | None = None,
) -> ResponseGetData:
"""Get execution history for a specific prompt.
Returns detailed execution information including status, outputs,
and error messages for a single prompt ID.
Args:
auth: ComfyAuth instance for base URL and authentication
prompt_id: Prompt ID from queue_workflow response
session: Optional HTTPX client for connection pooling
Returns:
ResponseGetData with:
- response: Dict with prompt execution details
- status: 200 on success
- is_success: True on success
Raises:
WorkflowError: If request fails
Example:
>>> auth = NoAuth("http://127.0.0.1:8188")
>>> res = await get_prompt_history(auth=auth, prompt_id="abc-123")
>>> print(res.response["abc-123"]["status"]["completed"])
True
"""
url = f"{auth.base_url}/history/{prompt_id}"
res = await gd(
auth=auth,
method="GET",
url=url,
session=session,
)
if not res.is_success:
raise WorkflowError(
f"Failed to get prompt history for {prompt_id}: HTTP {res.status}",
response=res,
)
return res