"""Workflow execution tools.
Impact: Critical (core functionality)
Complexity: High (polling, error handling, image retrieval)
"""
import json
import urllib
from pathlib import Path
from mcp.server.fastmcp import Context
from mcp.server.fastmcp.utilities.types import Image
from pydantic import Field
from ..api import comfy_get, comfy_post, get_file_url, poll_for_result
from ..models import ErrorResponse
from ..settings import settings
def is_ui_format(workflow: dict) -> bool:
"""Detect if workflow is in UI format (has nodes/links) vs API format (has class_type/inputs)."""
return "nodes" in workflow or "version" in workflow
def register_execution_tools(mcp):
"""Register workflow execution tools."""
@mcp.tool()
def run_workflow(
workflow_name: str = Field(description="Workflow filename"),
inputs: dict = Field(default=None, description="Node input overrides"),
output_node_id: str = Field(default=None, description="Output node ID"),
ctx: Context = None,
):
"""Execute a saved workflow file.
Args:
workflow_name: Workflow filename (e.g., 'flux-dev.json')
inputs: Optional input overrides, e.g., {"6": {"text": "new prompt"}}
output_node_id: Node ID to get output from (uses default if not set)
Returns the generated image or error message.
"""
if not settings.workflows_dir:
return "Error: COMFY_WORKFLOWS_DIR not configured"
wf_path = Path(settings.workflows_dir) / workflow_name
if not wf_path.exists():
return f"Error: Workflow '{workflow_name}' not found"
if ctx:
ctx.info(f"Loading workflow: {workflow_name}")
with open(wf_path) as f:
workflow = json.load(f)
# Check for UI format workflows
if is_ui_format(workflow):
return (
f"Error: Workflow '{workflow_name}' is in UI format (has nodes/widgets_values). "
"UI format uses positional arrays that can cause parameter misalignment errors. "
"Please re-export the workflow from ComfyUI using 'Export (API Format)' or use "
"convert_workflow_to_ui() to create a UI version from an API format workflow."
)
# Apply input overrides
if inputs:
for node_id, values in inputs.items():
if node_id in workflow:
if isinstance(values, dict):
workflow[node_id]["inputs"].update(values)
else:
# Simple value - try to set text input
if "text" in workflow[node_id]["inputs"]:
workflow[node_id]["inputs"]["text"] = values
out_node = output_node_id or settings.output_node_id
if not out_node:
return "Error: No output_node_id specified"
return _execute_workflow(workflow, out_node, ctx)
@mcp.tool()
def execute_workflow(
workflow: dict = Field(description="Complete workflow dict"),
output_node_id: str = Field(description="Node ID to get output from"),
ctx: Context = None,
):
"""Execute an arbitrary workflow dict.
Args:
workflow: Workflow dict in ComfyUI API format
output_node_id: Node ID that outputs the final image
Returns the generated image or error message.
Use this for programmatically built workflows.
"""
# Check for UI format workflows
if is_ui_format(workflow):
return (
"Error: Workflow is in UI format (has nodes/widgets_values). "
"UI format uses positional arrays that can cause parameter misalignment errors. "
"Please provide workflow in API format with explicit 'class_type' and 'inputs'."
)
if ctx:
ctx.info("Executing custom workflow...")
return _execute_workflow(workflow, output_node_id, ctx)
@mcp.tool()
def generate_image(
prompt: str = Field(description="Text prompt for image generation"),
ctx: Context = None,
):
"""Generate an image using the default workflow.
This is a simplified interface for quick image generation.
Requires COMFY_WORKFLOW_JSON_FILE, PROMPT_NODE_ID, and OUTPUT_NODE_ID
to be configured.
For more control, use run_workflow() or execute_workflow().
Args:
prompt: Text description of the image to generate
"""
if not settings.workflow_json_file:
return "Error: COMFY_WORKFLOW_JSON_FILE not configured"
if not settings.prompt_node_id:
return "Error: PROMPT_NODE_ID not configured"
if not settings.output_node_id:
return "Error: OUTPUT_NODE_ID not configured"
with open(settings.workflow_json_file) as f:
workflow = json.load(f)
workflow[settings.prompt_node_id]["inputs"]["text"] = prompt
if ctx:
ctx.info(f"Generating: {prompt[:50]}...")
return _execute_workflow(workflow, settings.output_node_id, ctx)
@mcp.tool()
def submit_workflow(
workflow: dict = Field(description="Workflow to submit"),
ctx: Context = None,
) -> dict:
"""Submit a workflow without waiting for completion.
Args:
workflow: Workflow dict to execute
Returns the prompt_id for tracking.
Use get_history() or get_prompt_status() to check completion.
"""
if ctx:
ctx.info("Submitting workflow...")
status, resp = comfy_post("/prompt", {"prompt": workflow})
if status != 200:
return ErrorResponse(
error=f"Submit failed: status {status}",
code="SUBMIT_FAILED",
details=resp,
).model_dump()
return {
"prompt_id": resp.get("prompt_id"),
"number": resp.get("number"),
"node_errors": resp.get("node_errors", {}),
}
@mcp.tool()
def get_prompt_status(
prompt_id: str = Field(description="Prompt ID to check"),
ctx: Context = None,
) -> dict:
"""Get the status of a submitted prompt.
Args:
prompt_id: The prompt ID from submit_workflow()
Returns status information including completion state.
"""
if ctx:
ctx.info(f"Checking status: {prompt_id}")
try:
history = comfy_get(f"/history/{prompt_id}")
if prompt_id not in history:
return {"status": "pending", "completed": False}
entry = history[prompt_id]
status = entry.get("status", {})
return {
"status": status.get("status_str", "unknown"),
"completed": status.get("completed", False),
"messages": status.get("messages", []),
"has_outputs": len(entry.get("outputs", {})) > 0,
}
except Exception as e:
return ErrorResponse.unavailable(str(e)).model_dump()
@mcp.tool()
def get_result_image(
prompt_id: str = Field(description="Prompt ID"),
output_node_id: str = Field(description="Output node ID"),
ctx: Context = None,
):
"""Get the result image from a completed prompt.
Args:
prompt_id: The prompt ID from submit_workflow()
output_node_id: Node ID that produced the image
Returns the image if available, or error message.
"""
if ctx:
ctx.info(f"Getting result: {prompt_id}")
try:
history = comfy_get(f"/history/{prompt_id}")
if prompt_id not in history:
return "Prompt not found in history"
entry = history[prompt_id]
status = entry.get("status", {})
if not status.get("completed"):
return "Prompt not yet completed"
outputs = entry.get("outputs", {})
if output_node_id not in outputs:
return f"No output from node {output_node_id}"
images = outputs[output_node_id].get("images", [])
if not images:
return "No images in output"
# Download image
url = get_file_url(settings.comfy_url, images[0])
from ..api import download_file
image_data = download_file(url)
if image_data:
return Image(data=image_data, format="png")
return "Failed to download image"
except Exception as e:
return f"Error: {e}"
def _execute_workflow(workflow: dict, output_node_id: str, ctx: Context | None):
"""Internal function to execute workflow and return result."""
# Submit workflow
status, resp_data = comfy_post("/prompt", {"prompt": workflow})
if status != 200:
error_msg = resp_data.get("error", f"status {status}")
return f"Failed to submit workflow: {error_msg}"
prompt_id = resp_data.get("prompt_id")
if not prompt_id:
node_errors = resp_data.get("node_errors", {})
if node_errors:
return f"Workflow validation failed:\n{json.dumps(node_errors, indent=2)}"
return "Failed to get prompt_id from response"
if ctx:
ctx.info(f"Submitted: {prompt_id}")
# Poll callback for progress logging
def on_poll(attempt: int, max_attempts: int):
if ctx and attempt % 5 == 0:
ctx.info(f"Waiting... ({attempt}/{max_attempts})")
# Poll for result
image_data = poll_for_result(prompt_id, output_node_id, on_poll=on_poll)
if image_data:
if ctx:
ctx.info("Image generated successfully")
if settings.output_mode.lower() == "url":
# Return URL instead of image data
history = comfy_get(f"/history/{prompt_id}")
if prompt_id in history:
outputs = history[prompt_id].get("outputs", {})
if output_node_id in outputs:
images = outputs[output_node_id].get("images", [])
if images:
url_values = urllib.parse.urlencode(images[0])
return get_file_url(settings.comfy_url_external, url_values)
return Image(data=image_data, format="png")
return "Failed to generate image. Use get_queue_status() and get_history() to debug."