Skip to main content
Glama

execute_workflow

Execute ComfyUI workflows by converting DSL to JSON and submitting to server. Optionally wait for completion and return results.

Instructions

Execute a DSL workflow on ComfyUI server.

Converts DSL to JSON and submits to ComfyUI for execution. Can optionally wait for completion and return results.

Args: dsl: Workflow content in DSL format server_address: ComfyUI server address (default: 127.0.0.1:8188) wait_for_completion: Whether to wait for execution to complete timeout_seconds: Maximum time to wait for completion

Returns: Execution result with prompt_id, status, and outputs if completed

Examples: execute_workflow(dsl_content) execute_workflow(dsl_content, server_address="192.168.1.100:8188") execute_workflow(dsl_content, wait_for_completion=False)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dslYes
server_addressNo127.0.0.1:8188
wait_for_completionNo
timeout_secondsNo

Implementation Reference

  • The core handler for the 'execute_workflow' tool. Decorated with @mcp.tool for FastMCP registration. Handles DSL to JSON conversion, ComfyUI API interaction, and execution monitoring.
    @mcp.tool
    async def execute_workflow(
        ctx: Context,
        dsl: str,
        server_address: str = DEFAULT_COMFYUI_SERVER,
        wait_for_completion: bool = True,
        timeout_seconds: int = 300
    ) -> Dict[str, Any]:
        """Execute a DSL workflow on ComfyUI server.
        
        Converts DSL to JSON and submits to ComfyUI for execution.
        Can optionally wait for completion and return results.
        
        Args:
            dsl: Workflow content in DSL format
            server_address: ComfyUI server address (default: 127.0.0.1:8188)
            wait_for_completion: Whether to wait for execution to complete
            timeout_seconds: Maximum time to wait for completion
        
        Returns:
            Execution result with prompt_id, status, and outputs if completed
        
        Examples:
            execute_workflow(dsl_content)
            execute_workflow(dsl_content, server_address="192.168.1.100:8188")
            execute_workflow(dsl_content, wait_for_completion=False)
        """
        await ctx.info(f"Executing workflow on {server_address}")
        
        try:
            # Convert DSL to JSON workflow
            await ctx.info("Converting DSL to ComfyUI JSON format...")
            parser = DSLParser()
            workflow_ast = parser.parse(dsl)
            
            converter = DslToJsonConverter()
            workflow_json = converter.convert(workflow_ast)
            
            await ctx.info(f"✓ Converted to JSON ({len(workflow_json)} nodes)")
            
            # Initialize ComfyUI client
            client = ComfyUIClient(server_address)
            
            # Submit workflow
            await ctx.info("Submitting workflow to ComfyUI...")
            prompt_id = await client.queue_prompt(workflow_json)
            await ctx.info(f"✓ Submitted with prompt_id: {prompt_id}")
            
            result = {
                "prompt_id": prompt_id,
                "server_address": server_address,
                "submitted_at": datetime.now().isoformat(),
                "status": "queued"
            }
            
            if not wait_for_completion:
                await ctx.info("Not waiting for completion (use get_job_status to check)")
                return result
            
            # Wait for completion
            await ctx.info(f"Waiting for completion (timeout: {timeout_seconds}s)...")
            
            start_time = asyncio.get_event_loop().time()
            while True:
                # Check if timeout exceeded
                if asyncio.get_event_loop().time() - start_time > timeout_seconds:
                    result["status"] = "timeout"
                    result["message"] = f"Execution exceeded {timeout_seconds}s timeout"
                    await ctx.info("⚠️ Execution timed out")
                    return result
                
                # Check execution status
                history = await client.get_history(prompt_id)
                
                if prompt_id in history:
                    execution = history[prompt_id]
                    status = execution.get("status", {})
                    
                    if status.get("completed", False):
                        result["status"] = "completed" if status.get("status_str") == "success" else "failed"
                        result["completed_at"] = datetime.now().isoformat()
                        result["execution_time"] = f"{asyncio.get_event_loop().time() - start_time:.1f}s"
                        
                        # Extract outputs
                        outputs = execution.get("outputs", {})
                        result["outputs"] = {}
                        
                        for node_id, output in outputs.items():
                            if "images" in output:
                                result["outputs"][node_id] = {
                                    "type": "images",
                                    "images": output["images"]
                                }
                        
                        if result["status"] == "completed":
                            await ctx.info(f"✅ Workflow completed successfully in {result['execution_time']}")
                            if result["outputs"]:
                                total_images = sum(len(out.get("images", [])) for out in result["outputs"].values())
                                await ctx.info(f"Generated {total_images} image(s)")
                        else:
                            await ctx.info(f"❌ Workflow failed: {status.get('messages', [])}")
                        
                        return result
                
                # Wait before checking again
                await asyncio.sleep(2)
                
        except Exception as e:
            raise ToolError(f"Failed to execute workflow: {e}")
  • Helper class ComfyUIClient that provides the low-level API calls to ComfyUI server (queue_prompt, get_history, etc.), used internally by the execute_workflow handler.
    class ComfyUIClient:
        """Client for ComfyUI API operations"""
        
        def __init__(self, server_address: str = DEFAULT_COMFYUI_SERVER):
            self.server_address = server_address
            self.base_url = f"http://{server_address}"
            self.ws_url = f"ws://{server_address}/ws"
            self.client_id = str(uuid.uuid4())
        
        async def queue_prompt(self, workflow: Dict[str, Any]) -> str:
            """Submit workflow for execution"""
            data = {
                "prompt": workflow,
                "client_id": self.client_id
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{self.base_url}/prompt",
                    json=data,
                    headers={'Content-Type': 'application/json'}
                )
                response.raise_for_status()
                result = response.json()
                
                if "error" in result:
                    raise ToolError(f"ComfyUI error: {result['error']}")
                
                return result.get("prompt_id")
        
        async def get_history(self, prompt_id: str) -> Dict[str, Any]:
            """Get execution history and results"""
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.base_url}/history/{prompt_id}")
                response.raise_for_status()
                return response.json()
        
        async def get_queue_status(self) -> Dict[str, Any]:
            """Get current queue status"""
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.base_url}/queue")
                response.raise_for_status()
                return response.json()
        
        async def download_image(self, filename: str, subfolder: str = "", folder_type: str = "output") -> bytes:
            """Download generated image"""
            params = {
                "filename": filename,
                "subfolder": subfolder,
                "type": folder_type
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.base_url}/view", params=params)
                response.raise_for_status()
                return response.content
  • The @mcp.tool decorator on the execute_workflow function registers it as an MCP tool with the FastMCP server instance 'mcp'.
    @mcp.tool

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/christian-byrne/comfy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server