"""Asset orchestration layer.
Manages asset lifecycle: registration, retrieval, expiration tracking.
Uses route functions for HTTP transport, provides business logic for asset management.
"""
import threading
import uuid
from datetime import datetime, timedelta
from typing import Any
from src.auth.base import ComfyAuth
from src.models.asset import AssetRecord
from src.routes.assets import get_asset, get_asset_metadata
from src.utils import get_global_logger
logger = get_global_logger("ComfyUI_MCP.orchestrators.asset")
def _make_asset_key(filename: str, subfolder: str, folder_type: str) -> str:
"""Create a stable lookup key from asset identity.
Args:
filename: Asset filename
subfolder: Asset subfolder
folder_type: Asset folder type (e.g., "output")
Returns:
Stable key for asset lookup
"""
return f"{folder_type}:{subfolder}:{filename}"
class AssetOrchestrator:
"""Orchestrator for asset management operations.
Manages in-memory asset registry with TTL expiration, thread-safe operations,
and asset retrieval via route functions.
Design Pattern:
- Uses (filename, subfolder, type) as stable identity instead of URLs
- Thread-safe with RLock for concurrent access
- TTL-based expiration with automatic cleanup
- Provenance tracking (comfy_history, submitted_workflow)
"""
def __init__(self, auth: ComfyAuth, ttl_hours: int = 24):
"""Initialize asset orchestrator.
Args:
auth: ComfyAuth instance for API access
ttl_hours: Asset time-to-live in hours (default: 24)
"""
self.auth = auth
self.ttl_hours = ttl_hours
# In-memory storage (ephemeral)
self._assets: dict[str, AssetRecord] = {} # asset_id -> AssetRecord
self._asset_key_to_id: dict[str, str] = {} # stable identity -> asset_id
self._lock = threading.RLock() # Reentrant lock for thread safety
logger.info(f"Initialized AssetOrchestrator with TTL: {ttl_hours} hours")
def register_asset(
self,
filename: str,
subfolder: str,
folder_type: str,
workflow_id: str,
prompt_id: str,
mime_type: str | None = None,
width: int | None = None,
height: int | None = None,
bytes_size: int | None = None,
comfy_history: dict[str, Any] | None = None,
submitted_workflow: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
session_id: str | None = None,
) -> AssetRecord:
"""Register a new asset and return AssetRecord with asset_id.
Uses (filename, subfolder, type) as stable identity. Handles deduplication
by returning existing record if same identity found and not expired.
Args:
filename: Asset filename (e.g., "ComfyUI_00123.png")
subfolder: Asset subfolder (e.g., "" or "my_folder")
folder_type: Asset folder type (e.g., "output" or "temp")
workflow_id: Workflow that generated this asset
prompt_id: ComfyUI prompt ID (links to history)
mime_type: Content type (e.g., "image/png")
width: Image width in pixels
height: Image height in pixels
bytes_size: File size in bytes
comfy_history: Full /history/{prompt_id} response (provenance)
submitted_workflow: Original workflow submitted (for regeneration)
metadata: Additional metadata dictionary
session_id: Session identifier for conversation isolation
Returns:
AssetRecord with generated asset_id and metadata
"""
with self._lock:
# Create stable lookup key
asset_key = _make_asset_key(filename, subfolder, folder_type)
# Check if asset already exists (deduplication)
existing_id = self._asset_key_to_id.get(asset_key)
if existing_id and existing_id in self._assets:
existing = self._assets[existing_id]
# Check if expired
if existing.expires_at and datetime.now() > existing.expires_at:
logger.debug(f"Existing asset {asset_key} expired, removing")
del self._assets[existing_id]
del self._asset_key_to_id[asset_key]
else:
# Update existing asset with new metadata/history if provided
if comfy_history is not None:
existing.comfy_history = comfy_history
if submitted_workflow is not None:
existing.submitted_workflow = submitted_workflow
logger.debug(
f"Asset {asset_key} already registered, returning existing: {existing_id}"
)
return existing
# Generate new asset_id (UUID for uniqueness)
asset_id = str(uuid.uuid4())
# Calculate expiration
expires_at = datetime.now() + timedelta(hours=self.ttl_hours)
# Create record
record = AssetRecord(
asset_id=asset_id,
filename=filename,
subfolder=subfolder,
folder_type=folder_type,
prompt_id=prompt_id,
workflow_id=workflow_id,
created_at=datetime.now(),
expires_at=expires_at,
mime_type=mime_type or "application/octet-stream",
width=width,
height=height,
bytes_size=bytes_size or 0,
sha256=None, # Could be computed if needed
comfy_history=comfy_history,
submitted_workflow=submitted_workflow,
metadata=metadata or {},
session_id=session_id,
)
# Set base URL for asset URL computation
record.set_base_url(self.auth.base_url)
# Store in registry
self._assets[asset_id] = record
self._asset_key_to_id[asset_key] = asset_id
logger.info(f"Registered asset {asset_id} ({asset_key}) for workflow {workflow_id}")
return record
def get_asset_record(self, asset_id: str) -> AssetRecord | None:
"""Retrieve asset record by ID, checking expiration.
Args:
asset_id: Asset UUID
Returns:
AssetRecord if found and not expired, None otherwise
"""
with self._lock:
record = self._assets.get(asset_id)
if not record:
logger.debug(f"Asset {asset_id} not found in registry")
return None
# Check expiration
if record.expires_at and datetime.now() > record.expires_at:
logger.info(f"Asset {asset_id} has expired, removing")
asset_key = _make_asset_key(record.filename, record.subfolder, record.folder_type)
del self._assets[asset_id]
if asset_key in self._asset_key_to_id:
del self._asset_key_to_id[asset_key]
return None
return record
def get_asset_by_identity(
self, filename: str, subfolder: str, folder_type: str
) -> AssetRecord | None:
"""Get asset record by stable identity.
Args:
filename: Asset filename
subfolder: Asset subfolder
folder_type: Asset folder type
Returns:
AssetRecord if found and not expired, None otherwise
"""
with self._lock:
asset_key = _make_asset_key(filename, subfolder, folder_type)
asset_id = self._asset_key_to_id.get(asset_key)
if not asset_id:
logger.debug(f"Asset identity {asset_key} not found")
return None
return self.get_asset_record(asset_id) # This checks expiration
def list_assets(
self, limit: int = 10, workflow_id: str | None = None, session_id: str | None = None
) -> list[AssetRecord]:
"""List recent assets with optional filtering.
Returns assets sorted by creation time (newest first).
Automatically cleans up expired assets before listing.
Args:
limit: Maximum number of assets to return
workflow_id: Filter by workflow type (e.g., "generate_image")
session_id: Filter by session ID for conversation isolation
Returns:
List of AssetRecords matching criteria
"""
with self._lock:
# Cleanup expired first
self.cleanup_expired()
# Collect all assets
assets = list(self._assets.values())
# Apply filters
if workflow_id:
assets = [a for a in assets if a.workflow_id == workflow_id]
if session_id:
assets = [a for a in assets if a.session_id == session_id]
# Sort by creation time (newest first)
assets.sort(key=lambda a: a.created_at, reverse=True)
# Apply limit
result = assets[:limit]
logger.debug(
f"Listed {len(result)} assets "
f"(workflow_id={workflow_id}, session_id={session_id})"
)
return result
def cleanup_expired(self) -> int:
"""Remove expired assets from registry.
Returns:
Number of assets removed
"""
with self._lock:
now = datetime.now()
expired_ids = [
asset_id
for asset_id, record in self._assets.items()
if record.expires_at and now > record.expires_at
]
for asset_id in expired_ids:
record = self._assets[asset_id]
asset_key = _make_asset_key(record.filename, record.subfolder, record.folder_type)
del self._assets[asset_id]
if asset_key in self._asset_key_to_id:
del self._asset_key_to_id[asset_key]
if expired_ids:
logger.info(f"Cleaned up {len(expired_ids)} expired assets")
return len(expired_ids)
async def fetch_asset_bytes(self, asset_record: AssetRecord) -> bytes:
"""Fetch asset bytes from ComfyUI using route function.
Args:
asset_record: AssetRecord with stable identity
Returns:
Asset bytes
Raises:
RuntimeError: If asset fetch fails
"""
logger.debug(f"Fetching asset bytes for {asset_record.asset_id}")
# Use route function to fetch asset
res = await get_asset(
auth=self.auth,
filename=asset_record.filename,
subfolder=asset_record.subfolder,
folder_type=asset_record.folder_type,
)
if not res.is_success:
error_msg = f"Failed to fetch asset: HTTP {res.status}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.debug(f"Fetched {len(res.response)} bytes for asset {asset_record.asset_id}")
return res.response
async def fetch_asset_metadata_remote(
self, filename: str, subfolder: str, folder_type: str
) -> dict[str, Any]:
"""Fetch asset metadata from ComfyUI (HEAD request).
Uses route function to get metadata without downloading full asset.
Args:
filename: Asset filename
subfolder: Asset subfolder
folder_type: Asset folder type
Returns:
Metadata dictionary with mime_type, width, height, bytes_size
Raises:
RuntimeError: If metadata fetch fails
"""
logger.debug(f"Fetching metadata for {folder_type}:{subfolder}:{filename}")
res = await get_asset_metadata(
auth=self.auth,
filename=filename,
subfolder=subfolder,
folder_type=folder_type,
)
if not res.is_success:
error_msg = f"Failed to fetch asset metadata: HTTP {res.status}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.debug(f"Fetched metadata: {res.response}")
return res.response
def register_and_build_response(
self,
result: dict[str, Any],
workflow_id: str,
tool_name: str | None = None,
session_id: str | None = None,
) -> dict[str, Any]:
"""Helper to register asset and build standardized response.
Eliminates code duplication across tools.
Args:
result: Result dict from workflow execution (contains asset info)
workflow_id: Workflow identifier
tool_name: Optional tool name (for workflow-backed tools)
session_id: Optional session identifier
Returns:
Standardized response dictionary with asset_id, asset_url, metadata
"""
# Extract asset metadata
asset_metadata = result.get("asset_metadata", {})
metadata = {"workflow_id": workflow_id}
if tool_name:
metadata["tool"] = tool_name
# Register asset
asset_record = self.register_asset(
filename=result.get("filename", ""),
subfolder=result.get("subfolder", ""),
folder_type=result.get("folder_type", "output"),
workflow_id=workflow_id,
prompt_id=result.get("prompt_id", ""),
mime_type=asset_metadata.get("mime_type"),
width=asset_metadata.get("width"),
height=asset_metadata.get("height"),
bytes_size=asset_metadata.get("bytes_size"),
comfy_history=result.get("comfy_history"),
submitted_workflow=result.get("submitted_workflow"),
metadata=metadata,
session_id=session_id,
)
# Build response
asset_url = asset_record.asset_url or result.get("asset_url", "")
response_data = {
"asset_id": asset_record.asset_id,
"asset_url": asset_url,
"image_url": asset_url, # Backward compatibility
"filename": asset_record.filename,
"subfolder": asset_record.subfolder,
"folder_type": asset_record.folder_type,
"workflow_id": workflow_id,
"prompt_id": result.get("prompt_id"),
"mime_type": asset_record.mime_type,
"width": asset_record.width,
"height": asset_record.height,
"bytes_size": asset_record.bytes_size,
}
if tool_name:
response_data["tool"] = tool_name
logger.info(
f"Built response for asset {asset_record.asset_id} "
f"(workflow={workflow_id}, tool={tool_name})"
)
return response_data