get_job_status
Check execution status and retrieve generated images from ComfyUI workflows using prompt IDs to monitor job progress and access outputs.
Instructions
Get status and results of a ComfyUI job.
Checks execution status and optionally downloads generated images.
Args: prompt_id: The prompt ID returned by execute_workflow server_address: ComfyUI server address download_images: Whether to download generated images image_save_path: Directory to save images (relative to workflows/)
Returns: Job status with completion info and image paths if downloaded
Examples: get_job_status("12345-abcde-67890") get_job_status("12345-abcde-67890", download_images=True)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| prompt_id | Yes | ||
| server_address | No | 127.0.0.1:8188 | |
| download_images | No | ||
| image_save_path | No | outputs |
Implementation Reference
- comfy_mcp/mcp/server.py:1046-1153 (handler)The core handler function for the 'get_job_status' tool. Decorated with @mcp.tool for registration. It fetches job status from ComfyUI server using ComfyUIClient, handles queued/running/completed/failed states, extracts image outputs, and optionally downloads images to the specified path.@mcp.tool async def get_job_status( ctx: Context, prompt_id: str, server_address: str = DEFAULT_COMFYUI_SERVER, download_images: bool = False, image_save_path: str = "outputs" ) -> Dict[str, Any]: """Get status and results of a ComfyUI job. Checks execution status and optionally downloads generated images. Args: prompt_id: The prompt ID returned by execute_workflow server_address: ComfyUI server address download_images: Whether to download generated images image_save_path: Directory to save images (relative to workflows/) Returns: Job status with completion info and image paths if downloaded Examples: get_job_status("12345-abcde-67890") get_job_status("12345-abcde-67890", download_images=True) """ await ctx.info(f"Checking job status for {prompt_id}") try: client = ComfyUIClient(server_address) # Get execution history history = await client.get_history(prompt_id) if prompt_id not in history: # Check queue queue = await client.get_queue_status() # Check if still in queue for item in queue.get("queue_running", []) + queue.get("queue_pending", []): if item[1] == prompt_id: return { "prompt_id": prompt_id, "status": "running" if item in queue.get("queue_running", []) else "queued", "position": queue.get("queue_pending", []).index(item) + 1 if item in queue.get("queue_pending", []) else 0 } return { "prompt_id": prompt_id, "status": "not_found", "message": "Job not found in history or queue" } # Parse execution results execution = history[prompt_id] status = execution.get("status", {}) result = { "prompt_id": prompt_id, "status": "completed" if status.get("completed", False) and status.get("status_str") == "success" else "failed" if status.get("completed", False) else "running", "messages": status.get("messages", []) } # Extract outputs if completed if status.get("completed", False): outputs = execution.get("outputs", {}) result["outputs"] = {} for node_id, output in outputs.items(): if "images" in output: result["outputs"][node_id] = { "type": "images", "count": len(output["images"]), "images": output["images"] } if download_images and result["outputs"]: await ctx.info("Downloading generated images...") # Create save directory save_dir = Path(image_save_path) save_dir.mkdir(parents=True, exist_ok=True) downloaded_files = [] for node_id, output in result["outputs"].items(): if output["type"] == "images": for i, image_info in enumerate(output["images"]): # Download image image_data = await client.download_image( image_info["filename"], image_info["subfolder"], image_info["type"] ) # Save with descriptive name filename = f"{prompt_id}_{node_id}_{i:03d}_{image_info['filename']}" file_path = save_dir / filename file_path.write_bytes(image_data) downloaded_files.append(str(file_path)) result["downloaded_files"] = downloaded_files await ctx.info(f"✓ Downloaded {len(downloaded_files)} image(s) to {save_dir}") return result except Exception as e: raise ToolError(f"Failed to get job status: {e}")
- comfy_mcp/mcp/server.py:877-933 (helper)ComfyUIClient helper class providing the API client methods used by get_job_status: get_history for execution results, get_queue_status for queue position, and download_image for fetching generated images.class ComfyUIClient: """Client for ComfyUI API operations""" def __init__(self, server_address: str = DEFAULT_COMFYUI_SERVER): self.server_address = server_address self.base_url = f"http://{server_address}" self.ws_url = f"ws://{server_address}/ws" self.client_id = str(uuid.uuid4()) async def queue_prompt(self, workflow: Dict[str, Any]) -> str: """Submit workflow for execution""" data = { "prompt": workflow, "client_id": self.client_id } async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/prompt", json=data, headers={'Content-Type': 'application/json'} ) response.raise_for_status() result = response.json() if "error" in result: raise ToolError(f"ComfyUI error: {result['error']}") return result.get("prompt_id") async def get_history(self, prompt_id: str) -> Dict[str, Any]: """Get execution history and results""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/history/{prompt_id}") response.raise_for_status() return response.json() async def get_queue_status(self) -> Dict[str, Any]: """Get current queue status""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/queue") response.raise_for_status() return response.json() async def download_image(self, filename: str, subfolder: str = "", folder_type: str = "output") -> bytes: """Download generated image""" params = { "filename": filename, "subfolder": subfolder, "type": folder_type } async with httpx.AsyncClient() as client: response = await client.get(f"{self.base_url}/view", params=params) response.raise_for_status() return response.content