Skip to main content
Glama

execute_workflow

Execute ComfyUI workflows by converting DSL to JSON and submitting to server. Optionally wait for completion and retrieve results.

Instructions

Execute a DSL workflow on ComfyUI server.

Converts DSL to JSON and submits to ComfyUI for execution. Can optionally wait for completion and return results.

Args: dsl: Workflow content in DSL format server_address: ComfyUI server address (default: 127.0.0.1:8188) wait_for_completion: Whether to wait for execution to complete timeout_seconds: Maximum time to wait for completion

Returns: Execution result with prompt_id, status, and outputs if completed

Examples: execute_workflow(dsl_content) execute_workflow(dsl_content, server_address="192.168.1.100:8188") execute_workflow(dsl_content, wait_for_completion=False)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dslYes
server_addressNo127.0.0.1:8188
wait_for_completionNo
timeout_secondsNo

Implementation Reference

  • The core handler function for the 'execute_workflow' tool. It parses the DSL input, converts it to ComfyUI JSON format, submits the workflow to the ComfyUI server via API, and optionally waits for completion while polling the status and extracting image outputs.
    @mcp.tool async def execute_workflow( ctx: Context, dsl: str, server_address: str = DEFAULT_COMFYUI_SERVER, wait_for_completion: bool = True, timeout_seconds: int = 300 ) -> Dict[str, Any]: """Execute a DSL workflow on ComfyUI server. Converts DSL to JSON and submits to ComfyUI for execution. Can optionally wait for completion and return results. Args: dsl: Workflow content in DSL format server_address: ComfyUI server address (default: 127.0.0.1:8188) wait_for_completion: Whether to wait for execution to complete timeout_seconds: Maximum time to wait for completion Returns: Execution result with prompt_id, status, and outputs if completed Examples: execute_workflow(dsl_content) execute_workflow(dsl_content, server_address="192.168.1.100:8188") execute_workflow(dsl_content, wait_for_completion=False) """ await ctx.info(f"Executing workflow on {server_address}") try: # Convert DSL to JSON workflow await ctx.info("Converting DSL to ComfyUI JSON format...") parser = DSLParser() workflow_ast = parser.parse(dsl) converter = DslToJsonConverter() workflow_json = converter.convert(workflow_ast) await ctx.info(f"✓ Converted to JSON ({len(workflow_json)} nodes)") # Initialize ComfyUI client client = ComfyUIClient(server_address) # Submit workflow await ctx.info("Submitting workflow to ComfyUI...") prompt_id = await client.queue_prompt(workflow_json) await ctx.info(f"✓ Submitted with prompt_id: {prompt_id}") result = { "prompt_id": prompt_id, "server_address": server_address, "submitted_at": datetime.now().isoformat(), "status": "queued" } if not wait_for_completion: await ctx.info("Not waiting for completion (use get_job_status to check)") return result # Wait for completion await ctx.info(f"Waiting for completion (timeout: {timeout_seconds}s)...") start_time = asyncio.get_event_loop().time() while True: # Check if timeout exceeded if asyncio.get_event_loop().time() - start_time > timeout_seconds: result["status"] = "timeout" result["message"] = f"Execution exceeded {timeout_seconds}s timeout" await ctx.info("⚠️ Execution timed out") return result # Check execution status history = await client.get_history(prompt_id) if prompt_id in history: execution = history[prompt_id] status = execution.get("status", {}) if status.get("completed", False): result["status"] = "completed" if status.get("status_str") == "success" else "failed" result["completed_at"] = datetime.now().isoformat() result["execution_time"] = f"{asyncio.get_event_loop().time() - start_time:.1f}s" # Extract outputs outputs = execution.get("outputs", {}) result["outputs"] = {} for node_id, output in outputs.items(): if "images" in output: result["outputs"][node_id] = { "type": "images", "images": output["images"] } if result["status"] == "completed": await ctx.info(f"✅ Workflow completed successfully in {result['execution_time']}") if result["outputs"]: total_images = sum(len(out.get("images", [])) for out in result["outputs"].values()) await ctx.info(f"Generated {total_images} image(s)") else: await ctx.info(f"❌ Workflow failed: {status.get('messages', [])}") return result # Wait before checking again await asyncio.sleep(2) except Exception as e: raise ToolError(f"Failed to execute workflow: {e}")
  • Helper class ComfyUIClient that provides the API methods used by execute_workflow: queue_prompt to submit workflows, get_history to check status, get_queue_status, and download_image.
    class ComfyUIClient: """Client for ComfyUI API operations""" def __init__(self, server_address: str = DEFAULT_COMFYUI_SERVER): self.server_address = server_address self.base_url = f"http://{server_address}" self.ws_url = f"ws://{server_address}/ws" self.client_id = str(uuid.uuid4()) async def queue_prompt(self, workflow: Dict[str, Any]) -> str: """Submit workflow for execution""" data = { "prompt": workflow, "client_id": self.client_id } async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/prompt", json=data, headers={'Content-Type': 'application/json'} ) response.raise_for_status() result = response.json() if "error" in result: raise ToolError(f"ComfyUI error: {result['error']}") return result.get("prompt_id") async def get_history(self, prompt_id: str) -> Dict[str, Any]: """Get execution history and results""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/history/{prompt_id}") response.raise_for_status() return response.json() async def get_queue_status(self) -> Dict[str, Any]: """Get current queue status""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/queue") response.raise_for_status() return response.json() async def download_image(self, filename: str, subfolder: str = "", folder_type: str = "output") -> bytes: """Download generated image""" params = { "filename": filename, "subfolder": subfolder, "type": folder_type } async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/view", params=params) response.raise_for_status() return response.content
  • The @mcp.tool decorator registers the execute_workflow function with the FastMCP server instance 'mcp'.
    @mcp.tool

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/christian-byrne/comfy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server