"""Workflow management tools for ComfyUI MCP Server
Thin MCP wrapper around WorkflowOrchestrator for workflow operations.
NO business logic - delegates to orchestrator.
FastMCP v3 integration:
- Exposes workflows as Resources for inspection
- Provides Tools for execution
- Supports Context for progress reporting
"""
from typing import Any
from fastmcp import FastMCP
from src.auth.base import ComfyAuth
from src.orchestrators.asset import AssetOrchestrator
from src.orchestrators.defaults import DefaultsManager
from src.orchestrators.workflow import ParameterExtractor, WorkflowOrchestrator
from src.tools.helpers import register_and_build_response
from src.utils import get_global_logger
from src.utils.context import generate_correlation_id, set_correlation_id
logger = get_global_logger("MCP_Server.tools.workflow")
def register_workflow_tools(
mcp: FastMCP,
auth: ComfyAuth,
workflow_orchestrator: WorkflowOrchestrator,
defaults_manager: DefaultsManager,
asset_orchestrator: AssetOrchestrator,
):
"""Register workflow tools with the MCP server
Registers both Resources (for inspection) and Tools (for execution)
following FastMCP v3 best practices.
Args:
mcp: FastMCP server instance
auth: Authentication for ComfyUI API calls
workflow_orchestrator: Orchestrator for workflow operations
defaults_manager: Orchestrator for defaults management
asset_orchestrator: Orchestrator for asset operations
"""
# ========================================================================
# RESOURCES - Allow agents to inspect workflows before executing
# ========================================================================
@mcp.resource("workflows://catalog")
def get_workflow_catalog_resource() -> str:
"""List all available ComfyUI workflows with their metadata.
Returns a formatted catalog showing workflow IDs, names, descriptions,
and available parameters for each workflow.
"""
catalog = workflow_orchestrator.get_workflow_catalog()
# Format as markdown for readability
lines = ["# Available ComfyUI Workflows\n"]
lines.append(f"Total workflows: {len(catalog)}\n")
lines.append(f"Workflow directory: {workflow_orchestrator.workflows_dir}\n")
for workflow in catalog:
lines.append(f"## {workflow['name']}")
lines.append(f"**ID:** `{workflow['id']}`")
lines.append(f"**Description:** {workflow['description']}")
if workflow["available_inputs"]:
lines.append("**Parameters:**")
for param_name, param_info in workflow["available_inputs"].items():
required = "required" if param_info["required"] else "optional"
lines.append(
f" - `{param_name}` ({param_info['type']}, {required}): {param_info['description']}"
)
else:
lines.append("**Parameters:** None (static workflow)")
lines.append("") # Blank line between workflows
return "\n".join(lines)
@mcp.resource("workflows://{workflow_id}/definition")
def get_workflow_definition_resource(workflow_id: str) -> dict:
"""Get the complete JSON definition for a specific workflow.
Args:
workflow_id: Workflow identifier (e.g., "generate_image")
Returns:
Complete workflow JSON template
"""
workflow = workflow_orchestrator.load_workflow(workflow_id)
if not workflow:
return {"error": f"Workflow '{workflow_id}' not found"}
return workflow
@mcp.resource("workflows://{workflow_id}/parameters")
def get_workflow_parameters_resource(workflow_id: str) -> dict:
"""Get the parameter schema for a specific workflow.
Args:
workflow_id: Workflow identifier (e.g., "generate_image")
Returns:
Dictionary mapping parameter names to their schemas
"""
workflow = workflow_orchestrator.load_workflow(workflow_id)
if not workflow:
return {"error": f"Workflow '{workflow_id}' not found"}
params = ParameterExtractor.extract_parameters(workflow)
return {
"workflow_id": workflow_id,
"parameters": {
name: {
"type": param.annotation.__name__,
"required": param.required,
"description": param.description,
"placeholder": param.placeholder,
"bindings": [
{"node_id": node_id, "input_name": input_name}
for node_id, input_name in param.bindings
],
}
for name, param in params.items()
},
"parameter_count": len(params),
}
# ========================================================================
# TOOLS - Execute workflows
# ========================================================================
@mcp.tool()
def list_workflows() -> dict:
"""List all available workflows in the workflow directory.
Returns a catalog of workflows with their IDs, names, descriptions,
available inputs, and optional metadata.
"""
catalog = workflow_orchestrator.get_workflow_catalog()
return {
"workflows": catalog,
"count": len(catalog),
"workflow_dir": str(workflow_orchestrator.workflows_dir),
}
@mcp.tool()
async def run_workflow(
workflow_id: str,
overrides: dict[str, Any] | None = None,
options: dict[str, Any] | None = None,
return_inline_preview: bool = False,
) -> dict:
"""Run a saved ComfyUI workflow with constrained parameter overrides.
Args:
workflow_id: The workflow ID (filename stem, e.g., "generate_image")
overrides: Optional dict of parameter overrides (e.g., {"prompt": "a cat", "width": 1024})
options: Optional dict of execution options (reserved for future use)
return_inline_preview: If True, include a small thumbnail base64 in response (256px, ~100KB)
Returns:
Result with asset_url, workflow_id, and execution metadata. If return_inline_preview=True,
also includes inline_preview_base64 for immediate viewing.
"""
if overrides is None:
overrides = {}
# Generate and set correlation ID for request tracing
correlation_id = generate_correlation_id()
set_correlation_id(correlation_id)
logger.info(f"[{correlation_id}] Running workflow: {workflow_id}")
try:
# Get Context if available (FastMCP v3 will inject it)
ctx = None
# Context would be injected by FastMCP framework if tool signature accepts it
# For now, we pass None and the orchestrator handles it gracefully
# Execute workflow using orchestrator (with polling and context support)
result = await workflow_orchestrator.execute_workflow(
auth=auth,
workflow_id=workflow_id,
parameters=overrides,
ctx=ctx, # Pass context for progress reporting
)
if "error" in result:
return result
# Register asset and build response
return register_and_build_response(
result,
workflow_id,
asset_orchestrator,
tool_name="run_workflow",
return_inline_preview=return_inline_preview,
session_id=None, # Session tracking can be added via request context
)
except Exception as exc:
logger.exception("Workflow '%s' failed", workflow_id)
return {"error": str(exc)}