"""Output manager for auto-saving MCP-generated workflow outputs.
Manages automatic saving of workflow execution outputs to executions/generations directory.
Separate from PublishManager (which handles web optimization + manifests for production).
Design Principles:
- Single Responsibility: Only handles MCP output auto-saving
- Non-blocking: Failures don't block workflow execution
- Optional web optimization: Can compress for disk efficiency
- No manifest: Simpler than PublishManager, focused on workflow tracking
- Workflow snapshots: Saves executed workflow JSON with metadata for LLM training correlation
"""
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from src.auth.base import ComfyAuth
from src.routes.assets import get_asset
from src.utils import get_global_logger
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
logger = get_global_logger("MCP_Server.orchestrators.output")
class OutputManager:
"""Manages auto-save of MCP-generated workflow outputs.
FastMCP v3 Best Practice: Focused, composable component with explicit dependencies.
Design:
- Saves to executions/generations/{workflow_id}/{filename}
- Filename format: {workflow_id}-{timestamp}-{original_name}
- Optional web optimization (WebP compression)
- Path security validation
- Atomic writes (temp file + rename)
"""
def __init__(
self,
auth: ComfyAuth,
output_root: Path,
comfyui_output_root: Path | None = None,
):
"""Initialize output manager.
Args:
auth: ComfyAuth instance for fetching assets from ComfyUI
output_root: Root directory for saving outputs (e.g., executions/generations)
comfyui_output_root: Optional ComfyUI output root for direct file access
"""
self.auth = auth
self.output_root = Path(output_root)
self.comfyui_output_root = Path(comfyui_output_root) if comfyui_output_root else None
# Ensure output root exists
self.output_root.mkdir(parents=True, exist_ok=True)
logger.info(f"Initialized OutputManager with output_root={self.output_root}")
async def save_workflow_output(
self,
asset_info: dict[str, Any],
workflow_id: str,
web_optimize: bool = True,
max_bytes: int = 600 * 1024, # 600KB default
) -> tuple[bool, Path | None, str | None]:
"""Save workflow output to executions/ directory.
Non-blocking: Returns success/failure but doesn't raise exceptions.
Args:
asset_info: Asset information dict with filename, subfolder, folder_type
workflow_id: Workflow identifier (used for directory organization)
web_optimize: If True, apply WebP compression
max_bytes: Maximum file size for web optimization
Returns:
Tuple of (success: bool, saved_path: Path | None, error: str | None)
Example:
success, path, err = await output_manager.save_workflow_output(
asset_info={"filename": "ComfyUI_00123.png", "subfolder": "", "folder_type": "output"},
workflow_id="generate_image",
web_optimize=True,
)
if success:
logger.info(f"Saved to {path}")
else:
logger.error(f"Failed: {err}")
"""
try:
# Extract asset metadata
filename = asset_info.get("filename", "")
subfolder = asset_info.get("subfolder", "")
folder_type = asset_info.get("folder_type", "output")
if not filename:
return False, None, "No filename in asset_info"
# Create workflow-specific subdirectory
workflow_dir = self.output_root / workflow_id
workflow_dir.mkdir(parents=True, exist_ok=True)
# Generate target filename: {workflow_id}-{timestamp}-{original_name}
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
original_stem = Path(filename).stem
original_ext = Path(filename).suffix
if web_optimize and PIL_AVAILABLE and original_ext.lower() in (".png", ".jpg", ".jpeg"):
target_ext = ".webp"
else:
target_ext = original_ext
target_filename = f"{workflow_id}-{timestamp}-{original_stem}{target_ext}"
target_path = workflow_dir / target_filename
# Fetch asset bytes from ComfyUI
logger.debug(
f"Fetching asset: filename={filename}, subfolder={subfolder}, type={folder_type}"
)
res = await get_asset(
auth=self.auth,
filename=filename,
subfolder=subfolder,
asset_type=folder_type,
)
if not res.is_success:
error_msg = f"Failed to fetch asset: HTTP {res.status}"
logger.error(error_msg)
return False, None, error_msg
asset_bytes = res.response
# Apply web optimization if requested
if web_optimize and PIL_AVAILABLE and original_ext.lower() in (".png", ".jpg", ".jpeg"):
try:
optimized_bytes = self._optimize_image(asset_bytes, max_bytes)
asset_bytes = optimized_bytes
logger.debug(
f"Web optimized: {len(res.response)} → {len(optimized_bytes)} bytes"
)
except Exception as e:
logger.warning(f"Web optimization failed, using original: {e}")
# Continue with original bytes
# Atomic write: temp file + rename
with tempfile.NamedTemporaryFile(
mode="wb", dir=workflow_dir, delete=False, suffix=".tmp"
) as tmp_file:
tmp_path = Path(tmp_file.name)
tmp_file.write(asset_bytes)
# Atomic rename
shutil.move(str(tmp_path), str(target_path))
logger.info(f"Saved workflow output: {target_path} ({len(asset_bytes)} bytes)")
return True, target_path, None
except Exception as e:
error_msg = f"Failed to save workflow output: {e}"
logger.error(error_msg, exc_info=True)
return False, None, error_msg
def _optimize_image(self, image_bytes: bytes, max_bytes: int) -> bytes:
"""Optimize image using WebP compression.
Args:
image_bytes: Original image bytes
max_bytes: Maximum file size target
Returns:
Optimized image bytes (WebP format)
Raises:
ImportError: If Pillow not available
ValueError: If optimization fails
"""
if not PIL_AVAILABLE:
raise ImportError("Pillow required for web optimization")
from io import BytesIO
# If already small enough, return as-is
if len(image_bytes) <= max_bytes:
return image_bytes
# Load image
with Image.open(BytesIO(image_bytes)) as im:
# Convert RGBA to RGB with white background
if im.mode in ("RGBA", "LA", "P"):
background = Image.new("RGB", im.size, (255, 255, 255))
if im.mode == "P":
im = im.convert("RGBA")
if im.mode in ("RGBA", "LA"):
# Paste with alpha mask
alpha = im.split()[-1] if im.mode == "RGBA" else None
background.paste(im, mask=alpha)
im = background
elif im.mode != "RGB":
im = im.convert("RGB")
# Try quality levels: 85, 75, 65, 55, 45, 35
for quality in [85, 75, 65, 55, 45, 35]:
output = BytesIO()
im.save(output, format="WEBP", quality=quality)
optimized_bytes = output.getvalue()
if len(optimized_bytes) <= max_bytes:
logger.debug(
f"Optimized with quality={quality}: {len(image_bytes)} → {len(optimized_bytes)} bytes"
)
return optimized_bytes
# If still too large, return best effort
logger.warning(f"Could not optimize below {max_bytes} bytes, using quality=35")
return optimized_bytes
def get_workflow_output_dir(self, workflow_id: str) -> Path:
"""Get output directory for a specific workflow.
Args:
workflow_id: Workflow identifier
Returns:
Path to workflow output directory
"""
return self.output_root / workflow_id
def save_workflow_snapshot(
self,
workflow_json: dict[str, Any],
workflow_id: str,
parameters: dict[str, Any],
prompt_id: str | None = None,
timestamp: str | None = None,
) -> tuple[bool, Path | None, str | None]:
"""Save executed workflow JSON snapshot with execution metadata.
Saves workflow snapshots alongside generated assets for LLM training correlation.
Filename uses matching timestamp pattern for easy pairing with asset outputs.
Args:
workflow_json: Executed workflow JSON (with parameters rendered)
workflow_id: Workflow identifier
parameters: Parameters used for execution
prompt_id: ComfyUI prompt ID (optional)
timestamp: ISO timestamp string (optional, generated if not provided)
Returns:
Tuple of (success: bool, saved_path: Path | None, error: str | None)
Example:
success, path, err = output_manager.save_workflow_snapshot(
workflow_json=rendered_workflow,
workflow_id="generate_image",
parameters={"prompt": "a cat", "steps": 20},
prompt_id="abc-123",
)
"""
try:
import json
# Create workflow-specific subdirectory
workflow_dir = self.output_root / workflow_id
workflow_dir.mkdir(parents=True, exist_ok=True)
# Generate timestamp if not provided
if timestamp is None:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
# Build snapshot with execution metadata for LLM training
snapshot = {
"workflow": workflow_json,
"execution": {
"workflow_id": workflow_id,
"parameters": parameters,
"prompt_id": prompt_id,
"timestamp": timestamp,
},
}
# Filename pattern: {workflow_id}-{timestamp}.workflow.json
target_filename = f"{workflow_id}-{timestamp}.workflow.json"
target_path = workflow_dir / target_filename
# Atomic write: temp file + rename
with tempfile.NamedTemporaryFile(
mode="w", dir=workflow_dir, delete=False, suffix=".tmp", encoding="utf-8"
) as tmp_file:
tmp_path = Path(tmp_file.name)
json.dump(snapshot, tmp_file, indent=2)
# Atomic rename
shutil.move(str(tmp_path), str(target_path))
logger.info(f"Saved workflow snapshot: {target_path}")
return True, target_path, None
except Exception as e:
error_msg = f"Failed to save workflow snapshot: {e}"
logger.error(error_msg, exc_info=True)
return False, None, error_msg
def list_workflow_outputs(self, workflow_id: str) -> list[Path]:
"""List all saved outputs for a workflow.
Args:
workflow_id: Workflow identifier
Returns:
List of output file paths
"""
workflow_dir = self.get_workflow_output_dir(workflow_id)
if not workflow_dir.exists():
return []
return sorted(
[f for f in workflow_dir.iterdir() if f.is_file()],
key=lambda p: p.stat().st_mtime,
reverse=True, # Most recent first
)