"""Helper functions for MCP tool registration and response building
Shared utilities for tool implementations.
"""
from typing import Any
from src.orchestrators.asset import AssetOrchestrator
from src.utils import get_global_logger
from src.utils.context import get_correlation_id
logger = get_global_logger("MCP_Server.tools.helpers")
def register_and_build_response(
result: dict[str, Any],
workflow_id: str,
asset_orchestrator: AssetOrchestrator,
tool_name: str | None = None,
return_inline_preview: bool = False,
session_id: str | None = None,
) -> dict[str, Any]:
"""Register asset and build standardized MCP response
Args:
result: Result dict from workflow execution (contains 'output' with asset info)
workflow_id: Workflow identifier
asset_orchestrator: Orchestrator for asset registration
tool_name: Optional tool name for metadata
return_inline_preview: If True, include base64 preview in response
session_id: Optional session identifier
Returns:
Standardized response dict with asset information
"""
try:
# Extract asset information from result.
# NOTE: The workflow orchestrator returns a flat schema (filename/subfolder/folder_type),
# while some older tool paths used a nested `output` dict. Support both.
if "error" in result:
return result
output: dict[str, Any]
if isinstance(result.get("output"), dict) and result.get("output"):
output = result["output"]
else:
# Orchestrator schema:
# {
# "filename": "...",
# "subfolder": "...",
# "folder_type": "output",
# "prompt_id": "...",
# "comfy_history": {...},
# "submitted_workflow": {...}
# }
if not result.get("filename"):
return {"error": "No output returned from workflow execution"}
output = {
"filename": result.get("filename"),
"subfolder": result.get("subfolder", ""),
"folder_type": result.get("folder_type", "output"),
# Optional fields (may be absent)
"width": result.get("width"),
"height": result.get("height"),
"size": result.get("bytes_size") or result.get("size"),
"mime_type": result.get("mime_type"),
"metadata": result.get("metadata", {}),
}
# Best-effort MIME type inference for common image outputs.
mime_type = output.get("mime_type") or output.get("type")
if not mime_type:
filename = str(output.get("filename") or "")
lowered = filename.lower()
if lowered.endswith(".png"):
mime_type = "image/png"
elif lowered.endswith(".jpg") or lowered.endswith(".jpeg"):
mime_type = "image/jpeg"
elif lowered.endswith(".webp"):
mime_type = "image/webp"
# Get correlation ID from context for traceability
correlation_id = get_correlation_id()
# Register asset with orchestrator
asset_record = asset_orchestrator.register_asset(
filename=output.get("filename"),
subfolder=output.get("subfolder", ""),
folder_type=output.get("folder_type", "output"),
mime_type=mime_type,
width=output.get("width"),
height=output.get("height"),
bytes_size=output.get("size"),
workflow_id=workflow_id,
prompt_id=result.get("prompt_id"),
submitted_workflow=result.get("submitted_workflow")
or result.get("workflow")
or result.get("submitted"),
comfy_history=result.get("comfy_history") or result.get("history"),
session_id=session_id,
metadata={
"tool_name": tool_name or workflow_id,
"correlation_id": correlation_id,
**output.get("metadata", {}),
},
)
# Build response
asset_url = asset_orchestrator.get_asset_url(asset_record)
response = {
"asset_id": asset_record.asset_id,
"asset_url": asset_url,
"filename": asset_record.filename,
"workflow_id": workflow_id,
"prompt_id": result.get("prompt_id"),
"mime_type": asset_record.mime_type,
}
# Add dimensional info if available
if asset_record.width:
response["width"] = asset_record.width
if asset_record.height:
response["height"] = asset_record.height
if asset_record.bytes_size:
response["size"] = asset_record.bytes_size
# Add inline preview if requested (for images only)
if return_inline_preview and asset_record.mime_type and "image" in asset_record.mime_type:
try:
from src.utils.asset_processor import encode_preview_for_mcp, get_cache_key
image_bytes = asset_orchestrator.fetch_asset_bytes(asset_record)
cache_key = get_cache_key(asset_record.asset_id, 256, 70)
encoded = encode_preview_for_mcp(
image_bytes, max_dim=256, max_b64_chars=100_000, quality=70, cache_key=cache_key
)
response["inline_preview_base64"] = encoded.b64_str
response["inline_preview_format"] = "webp"
logger.info(
f"Added inline preview for {asset_record.asset_id}: {encoded.b64_chars} chars"
)
except Exception as e:
logger.warning(f"Failed to generate inline preview: {e}")
return response
except Exception as e:
logger.exception("Failed to register asset and build response")
return {"error": f"Failed to process result: {str(e)}"}