execute_workflow
Execute ComfyUI workflows by converting DSL to JSON and submitting to server. Optionally wait for completion and return results.
Instructions
Execute a DSL workflow on ComfyUI server.
Converts DSL to JSON and submits to ComfyUI for execution. Can optionally wait for completion and return results.
Args: dsl: Workflow content in DSL format server_address: ComfyUI server address (default: 127.0.0.1:8188) wait_for_completion: Whether to wait for execution to complete timeout_seconds: Maximum time to wait for completion
Returns: Execution result with prompt_id, status, and outputs if completed
Examples: execute_workflow(dsl_content) execute_workflow(dsl_content, server_address="192.168.1.100:8188") execute_workflow(dsl_content, wait_for_completion=False)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dsl | Yes | ||
| server_address | No | 127.0.0.1:8188 | |
| wait_for_completion | No | ||
| timeout_seconds | No |
Implementation Reference
- comfy_mcp/mcp/server.py:935-1044 (handler)The core handler for the 'execute_workflow' tool. Decorated with @mcp.tool for FastMCP registration. Handles DSL to JSON conversion, ComfyUI API interaction, and execution monitoring.@mcp.tool async def execute_workflow( ctx: Context, dsl: str, server_address: str = DEFAULT_COMFYUI_SERVER, wait_for_completion: bool = True, timeout_seconds: int = 300 ) -> Dict[str, Any]: """Execute a DSL workflow on ComfyUI server. Converts DSL to JSON and submits to ComfyUI for execution. Can optionally wait for completion and return results. Args: dsl: Workflow content in DSL format server_address: ComfyUI server address (default: 127.0.0.1:8188) wait_for_completion: Whether to wait for execution to complete timeout_seconds: Maximum time to wait for completion Returns: Execution result with prompt_id, status, and outputs if completed Examples: execute_workflow(dsl_content) execute_workflow(dsl_content, server_address="192.168.1.100:8188") execute_workflow(dsl_content, wait_for_completion=False) """ await ctx.info(f"Executing workflow on {server_address}") try: # Convert DSL to JSON workflow await ctx.info("Converting DSL to ComfyUI JSON format...") parser = DSLParser() workflow_ast = parser.parse(dsl) converter = DslToJsonConverter() workflow_json = converter.convert(workflow_ast) await ctx.info(f"✓ Converted to JSON ({len(workflow_json)} nodes)") # Initialize ComfyUI client client = ComfyUIClient(server_address) # Submit workflow await ctx.info("Submitting workflow to ComfyUI...") prompt_id = await client.queue_prompt(workflow_json) await ctx.info(f"✓ Submitted with prompt_id: {prompt_id}") result = { "prompt_id": prompt_id, "server_address": server_address, "submitted_at": datetime.now().isoformat(), "status": "queued" } if not wait_for_completion: await ctx.info("Not waiting for completion (use get_job_status to check)") return result # Wait for completion await ctx.info(f"Waiting for completion (timeout: {timeout_seconds}s)...") start_time = asyncio.get_event_loop().time() while True: # Check if timeout exceeded if asyncio.get_event_loop().time() - start_time > timeout_seconds: result["status"] = "timeout" result["message"] = f"Execution exceeded {timeout_seconds}s timeout" await ctx.info("⚠️ Execution timed out") return result # Check execution status history = await client.get_history(prompt_id) if prompt_id in history: execution = history[prompt_id] status = execution.get("status", {}) if status.get("completed", False): result["status"] = "completed" if status.get("status_str") == "success" else "failed" result["completed_at"] = datetime.now().isoformat() result["execution_time"] = f"{asyncio.get_event_loop().time() - start_time:.1f}s" # Extract outputs outputs = execution.get("outputs", {}) result["outputs"] = {} for node_id, output in outputs.items(): if "images" in output: result["outputs"][node_id] = { "type": "images", "images": output["images"] } if result["status"] == "completed": await ctx.info(f"✅ Workflow completed successfully in {result['execution_time']}") if result["outputs"]: total_images = sum(len(out.get("images", [])) for out in result["outputs"].values()) await ctx.info(f"Generated {total_images} image(s)") else: await ctx.info(f"❌ Workflow failed: {status.get('messages', [])}") return result # Wait before checking again await asyncio.sleep(2) except Exception as e: raise ToolError(f"Failed to execute workflow: {e}")
- comfy_mcp/mcp/server.py:877-934 (helper)Helper class ComfyUIClient that provides the low-level API calls to ComfyUI server (queue_prompt, get_history, etc.), used internally by the execute_workflow handler.class ComfyUIClient: """Client for ComfyUI API operations""" def __init__(self, server_address: str = DEFAULT_COMFYUI_SERVER): self.server_address = server_address self.base_url = f"http://{server_address}" self.ws_url = f"ws://{server_address}/ws" self.client_id = str(uuid.uuid4()) async def queue_prompt(self, workflow: Dict[str, Any]) -> str: """Submit workflow for execution""" data = { "prompt": workflow, "client_id": self.client_id } async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/prompt", json=data, headers={'Content-Type': 'application/json'} ) response.raise_for_status() result = response.json() if "error" in result: raise ToolError(f"ComfyUI error: {result['error']}") return result.get("prompt_id") async def get_history(self, prompt_id: str) -> Dict[str, Any]: """Get execution history and results""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/history/{prompt_id}") response.raise_for_status() return response.json() async def get_queue_status(self) -> Dict[str, Any]: """Get current queue status""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/queue") response.raise_for_status() return response.json() async def download_image(self, filename: str, subfolder: str = "", folder_type: str = "output") -> bytes: """Download generated image""" params = { "filename": filename, "subfolder": subfolder, "type": folder_type } async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/view", params=params) response.raise_for_status() return response.content
- comfy_mcp/mcp/server.py:935-935 (registration)The @mcp.tool decorator on the execute_workflow function registers it as an MCP tool with the FastMCP server instance 'mcp'.@mcp.tool