Skip to main content
Glama
execution.py10.6 kB
"""Workflow execution tools. Impact: Critical (core functionality) Complexity: High (polling, error handling, image retrieval) """ import json import urllib from pathlib import Path from mcp.server.fastmcp import Context, Image from pydantic import Field from ..api import comfy_get, comfy_post, get_file_url, poll_for_result from ..models import ErrorResponse from ..settings import settings def is_ui_format(workflow: dict) -> bool: """Detect if workflow is in UI format (has nodes/links) vs API format (has class_type/inputs).""" return "nodes" in workflow or "version" in workflow def register_execution_tools(mcp): """Register workflow execution tools.""" @mcp.tool() def run_workflow( workflow_name: str = Field(description="Workflow filename"), inputs: dict = Field(default=None, description="Node input overrides"), output_node_id: str = Field(default=None, description="Output node ID"), ctx: Context = None, ) -> Image | str: """Execute a saved workflow file. Args: workflow_name: Workflow filename (e.g., 'flux-dev.json') inputs: Optional input overrides, e.g., {"6": {"text": "new prompt"}} output_node_id: Node ID to get output from (uses default if not set) Returns the generated image or error message. """ if not settings.workflows_dir: return "Error: COMFY_WORKFLOWS_DIR not configured" wf_path = Path(settings.workflows_dir) / workflow_name if not wf_path.exists(): return f"Error: Workflow '{workflow_name}' not found" if ctx: ctx.info(f"Loading workflow: {workflow_name}") with open(wf_path) as f: workflow = json.load(f) # Check for UI format workflows if is_ui_format(workflow): return ( f"Error: Workflow '{workflow_name}' is in UI format (has nodes/widgets_values). " "UI format uses positional arrays that can cause parameter misalignment errors. " "Please re-export the workflow from ComfyUI using 'Export (API Format)' or use " "convert_workflow_to_ui() to create a UI version from an API format workflow." ) # Apply input overrides if inputs: for node_id, values in inputs.items(): if node_id in workflow: if isinstance(values, dict): workflow[node_id]["inputs"].update(values) else: # Simple value - try to set text input if "text" in workflow[node_id]["inputs"]: workflow[node_id]["inputs"]["text"] = values out_node = output_node_id or settings.output_node_id if not out_node: return "Error: No output_node_id specified" return _execute_workflow(workflow, out_node, ctx) @mcp.tool() def execute_workflow( workflow: dict = Field(description="Complete workflow dict"), output_node_id: str = Field(description="Node ID to get output from"), ctx: Context = None, ) -> Image | str: """Execute an arbitrary workflow dict. Args: workflow: Workflow dict in ComfyUI API format output_node_id: Node ID that outputs the final image Returns the generated image or error message. Use this for programmatically built workflows. """ # Check for UI format workflows if is_ui_format(workflow): return ( "Error: Workflow is in UI format (has nodes/widgets_values). " "UI format uses positional arrays that can cause parameter misalignment errors. " "Please provide workflow in API format with explicit 'class_type' and 'inputs'." ) if ctx: ctx.info("Executing custom workflow...") return _execute_workflow(workflow, output_node_id, ctx) @mcp.tool() def generate_image( prompt: str = Field(description="Text prompt for image generation"), ctx: Context = None, ) -> Image | str: """Generate an image using the default workflow. This is a simplified interface for quick image generation. Requires COMFY_WORKFLOW_JSON_FILE, PROMPT_NODE_ID, and OUTPUT_NODE_ID to be configured. For more control, use run_workflow() or execute_workflow(). Args: prompt: Text description of the image to generate """ if not settings.workflow_json_file: return "Error: COMFY_WORKFLOW_JSON_FILE not configured" if not settings.prompt_node_id: return "Error: PROMPT_NODE_ID not configured" if not settings.output_node_id: return "Error: OUTPUT_NODE_ID not configured" with open(settings.workflow_json_file) as f: workflow = json.load(f) workflow[settings.prompt_node_id]["inputs"]["text"] = prompt if ctx: ctx.info(f"Generating: {prompt[:50]}...") return _execute_workflow(workflow, settings.output_node_id, ctx) @mcp.tool() def submit_workflow( workflow: dict = Field(description="Workflow to submit"), ctx: Context = None, ) -> dict: """Submit a workflow without waiting for completion. Args: workflow: Workflow dict to execute Returns the prompt_id for tracking. Use get_history() or get_prompt_status() to check completion. """ if ctx: ctx.info("Submitting workflow...") status, resp = comfy_post("/prompt", {"prompt": workflow}) if status != 200: return ErrorResponse( error=f"Submit failed: status {status}", code="SUBMIT_FAILED", details=resp, ).model_dump() return { "prompt_id": resp.get("prompt_id"), "number": resp.get("number"), "node_errors": resp.get("node_errors", {}), } @mcp.tool() def get_prompt_status( prompt_id: str = Field(description="Prompt ID to check"), ctx: Context = None, ) -> dict: """Get the status of a submitted prompt. Args: prompt_id: The prompt ID from submit_workflow() Returns status information including completion state. """ if ctx: ctx.info(f"Checking status: {prompt_id}") try: history = comfy_get(f"/history/{prompt_id}") if prompt_id not in history: return {"status": "pending", "completed": False} entry = history[prompt_id] status = entry.get("status", {}) return { "status": status.get("status_str", "unknown"), "completed": status.get("completed", False), "messages": status.get("messages", []), "has_outputs": len(entry.get("outputs", {})) > 0, } except Exception as e: return ErrorResponse.unavailable(str(e)).model_dump() @mcp.tool() def get_result_image( prompt_id: str = Field(description="Prompt ID"), output_node_id: str = Field(description="Output node ID"), ctx: Context = None, ) -> Image | str: """Get the result image from a completed prompt. Args: prompt_id: The prompt ID from submit_workflow() output_node_id: Node ID that produced the image Returns the image if available, or error message. """ if ctx: ctx.info(f"Getting result: {prompt_id}") try: history = comfy_get(f"/history/{prompt_id}") if prompt_id not in history: return "Prompt not found in history" entry = history[prompt_id] status = entry.get("status", {}) if not status.get("completed"): return "Prompt not yet completed" outputs = entry.get("outputs", {}) if output_node_id not in outputs: return f"No output from node {output_node_id}" images = outputs[output_node_id].get("images", []) if not images: return "No images in output" # Download image url = get_file_url(settings.comfy_url, images[0]) from ..api import download_file image_data = download_file(url) if image_data: return Image(data=image_data, format="png") return "Failed to download image" except Exception as e: return f"Error: {e}" def _execute_workflow(workflow: dict, output_node_id: str, ctx: Context | None) -> Image | str: """Internal function to execute workflow and return result.""" # Submit workflow status, resp_data = comfy_post("/prompt", {"prompt": workflow}) if status != 200: error_msg = resp_data.get("error", f"status {status}") return f"Failed to submit workflow: {error_msg}" prompt_id = resp_data.get("prompt_id") if not prompt_id: node_errors = resp_data.get("node_errors", {}) if node_errors: return f"Workflow validation failed:\n{json.dumps(node_errors, indent=2)}" return "Failed to get prompt_id from response" if ctx: ctx.info(f"Submitted: {prompt_id}") # Poll callback for progress logging def on_poll(attempt: int, max_attempts: int): if ctx and attempt % 5 == 0: ctx.info(f"Waiting... ({attempt}/{max_attempts})") # Poll for result image_data = poll_for_result(prompt_id, output_node_id, on_poll=on_poll) if image_data: if ctx: ctx.info("Image generated successfully") if settings.output_mode.lower() == "url": # Return URL instead of image data history = comfy_get(f"/history/{prompt_id}") if prompt_id in history: outputs = history[prompt_id].get("outputs", {}) if output_node_id in outputs: images = outputs[output_node_id].get("images", []) if images: url_values = urllib.parse.urlencode(images[0]) return get_file_url(settings.comfy_url_external, url_values) return Image(data=image_data, format="png") return "Failed to generate image. Use get_queue_status() and get_history() to debug."

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/IO-AtelierTech/comfyui-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server