# Orchestration Layer - Implementation Guide
**Purpose**: Define patterns for the orchestration layer that coordinates business logic and route function calls.
---
## Technical Stack
- **Language**: Python 3.10+
- **Core Libraries**:
- `asyncio` - Async/await coordination and sync-to-async bridging
- `logging` - Structured logging (stdlib)
- `threading` - Thread-safe operations (AssetOrchestrator)
- `fastmcp` - FastMCP v3 Context integration for progress reporting
- **Key Patterns**:
- Dependency Injection (Auth objects, managers)
- Sync-to-Async bridging (`asyncio.run()`)
- Polling with timeout and retry logic
- Domain object creation from API responses
- **FastMCP v3 Context integration** (progress reporting, structured logging)
- **Null Object Pattern** (optional dependencies without Optional types)
---
## FastMCP v3 Integration
### Context Parameter Pattern
FastMCP v3 provides a `Context` object for progress reporting and structured logging. Orchestrators accept an optional `ctx` parameter:
```python
from typing import Any
# Import with fallback
try:
from fastmcp import Context
except ImportError:
Context = None # type: ignore
async def execute_and_wait(
self,
workflow: dict,
ctx: Any = None, # Optional Context (use Any to avoid type errors)
max_attempts: int = 300,
poll_interval: float = 1.0,
) -> dict:
"""Execute workflow with optional progress reporting."""
# Progress: Starting
if ctx and Context:
await ctx.info("Queueing workflow for execution")
await ctx.report_progress(0.0, "Workflow queued")
# ... do work ...
# Progress: Milestone (every 30 seconds)
elapsed = attempt * poll_interval
if elapsed % 30 == 0:
if ctx and Context:
progress = min(0.9, attempt / max_attempts)
await ctx.report_progress(
progress,
f"Processing... ({elapsed:.0f}s elapsed)"
)
# Progress: Complete
if ctx and Context:
await ctx.report_progress(1.0, "Workflow complete")
await ctx.info(f"Extracted {len(assets)} assets")
return result
```
**Key Principles**:
1. **Always Optional**: Context is never required, code works without it
2. **Type Safety**: Use `ctx: Any = None` to avoid type checking issues
3. **Runtime Check**: Always check `if ctx and Context:` before using
4. **Graceful Fallback**: Use `logger.info()` when Context unavailable
5. **Progress Milestones**: Report at 0%, every 30s, and 100%
### Structured Logging with Context
```python
# With Context (structured, agent-visible)
if ctx and Context:
await ctx.info("Starting workflow execution")
await ctx.error("Failed to extract assets", exc_info=exc)
else:
# Fallback to stdlib logging
logger.info("Starting workflow execution")
logger.error("Failed to extract assets", exc_info=exc)
```
**Benefits**:
- Real-time feedback to AI agents via MCP protocol
- Progress bars in MCP clients
- Structured event logs for debugging
- No code duplication (works with or without Context)
---
## Orchestration Layer Responsibilities
### What Belongs Here
- ✅ Business logic and workflow coordination
- ✅ Data validation and transformation
- ✅ Polling and retry logic
- ✅ Multiple route function coordination
- ✅ Response parsing and domain object creation
- ✅ Error handling and recovery strategies
### What Does NOT Belong Here
- ❌ HTTP transport (use route functions)
- ❌ Auth header construction (pass Auth objects)
- ❌ MCP tool registration (belongs in tools layer)
- ❌ Raw HTTP client usage (use routes)
---
## Orchestrator Pattern
### Template
```python
"""Module docstring describing orchestrator purpose."""
import asyncio
from typing import Any, Dict, Optional
from auth.base import ComfyAuth
from routes import queue_workflow, get_prompt_history
from client.response import ResponseGetData
from src.utils import get_global_logger
# Initialize logger with automatic correlation ID support
logger = get_global_logger("ComfyUI_MCP.orchestrators.my_orchestrator")
class MyOrchestrator:
"""Orchestrator for [specific domain]."""
def __init__(self, auth: ComfyAuth):
"""Initialize orchestrator with auth.
Args:
auth: ComfyAuth instance for API access
"""
self.auth = auth
self.logger = get_global_logger(__name__)
def public_method(self, param: str) -> Dict[str, Any]:
"""Public method (sync) that bridges to async routes.
Args:
param: Parameter description
Returns:
Processed result dictionary
Raises:
ValueError: When validation fails
RuntimeError: When operation fails
"""
self.logger.info(f"Starting operation with param: {param}")
try:
# 1. Validate inputs
validated = self._validate_input(param)
self.logger.debug(f"Validation passed for param: {validated}")
# 2. Execute async operation
result = asyncio.run(self._async_operation(validated))
# 3. Transform result
transformed = self._transform_result(result)
self.logger.info(f"Operation completed successfully")
return transformed
except ValueError as e:
self.logger.error(f"Validation failed: {e}")
raise
except RuntimeError as e:
self.logger.error(f"Operation failed: {e}", exc_info=True)
raise
async def _async_operation(self, param: str) -> ResponseGetData:
"""Internal async method that calls route functions.
Args:
param: Validated parameter
Returns:
ResponseGetData from route function
"""
# Call route function (NOT direct HTTP!)
res = await queue_workflow(
auth=self.auth,
workflow={"param": param},
)
if not res.is_success:
raise RuntimeError(f"Operation failed: HTTP {res.status}")
return res
def _validate_input(self, param: str) -> str:
"""Validation logic (private method)."""
if not param:
raise ValueError("Parameter required")
return param.strip()
def _transform_result(self, res: ResponseGetData) -> Dict[str, Any]:
"""Transform route response to domain object."""
return {
"id": res.response["prompt_id"],
"status": "queued",
}
```
---
## Logging in Orchestrators
### Logger Setup
Each orchestrator module should use the global correlation logger:
```python
from src.utils import get_global_logger
# Module-level logger (correlation-aware)
logger = get_global_logger(__name__)
class WorkflowOrchestrator:
def __init__(self, auth: ComfyAuth):
# Use module-level logger (already correlation-aware)
self.logger = logger
self.auth = auth
```
### What to Log
#### ✅ Business Logic Events
- State transitions (workflow queued, completed)
- Validation results (overrides applied, invalid params)
- Cache operations (hit, miss, cleanup)
- Resource management (files loaded, expired assets removed)
- Polling progress (waiting for completion, timeout)
```python
class WorkflowOrchestrator:
def queue_workflow(self, workflow: dict) -> dict:
self.logger.info(f"Queueing workflow with {len(workflow)} nodes")
res = asyncio.run(queue_workflow(auth=self.auth, workflow=workflow))
prompt_id = res.response["prompt_id"]
self.logger.info(f"Workflow queued successfully: {prompt_id}")
return {"prompt_id": prompt_id, "status": "queued"}
```
#### ✅ Error Conditions
- Validation failures
- Route failures
- Business logic errors
- Resource not found
```python
class WorkflowOrchestrator:
def load_workflow(self, workflow_id: str) -> dict:
try:
self.logger.debug(f"Loading workflow: {workflow_id}")
workflow = self._load_from_disk(workflow_id)
return workflow
except FileNotFoundError:
self.logger.error(f"Workflow not found: {workflow_id}")
raise
except Exception as e:
self.logger.error(
f"Failed to load workflow {workflow_id}: {e}",
exc_info=True # Include stack trace
)
raise
```
#### ✅ Performance Metrics
- Operation duration
- Polling iterations
- Retry attempts
- Batch sizes
```python
import time
class WorkflowOrchestrator:
def execute_with_polling(self, workflow: dict) -> dict:
start_time = time.time()
res = asyncio.run(queue_workflow(auth=self.auth, workflow=workflow))
prompt_id = res.response["prompt_id"]
iterations = 0
while iterations < 300:
iterations += 1
# Poll for completion...
duration = time.time() - start_time
self.logger.info(
f"Workflow completed in {duration:.2f}s after {iterations} polls"
)
```
#### ❌ DO NOT Log
- HTTP request/response details (handled by routes layer)
- Auth headers or tokens (security risk)
- Large payloads (use summary instead)
- Every loop iteration (use milestones)
### Logging Levels
**DEBUG**: Internal state, validation details, cache operations
```python
self.logger.debug(f"Cache hit for workflow: {workflow_id}")
self.logger.debug(f"Applied {len(overrides)} parameter overrides")
```
**INFO**: Business events, successful operations, key milestones
```python
self.logger.info(f"Workflow queued: {prompt_id}")
self.logger.info(f"Asset registered: {asset_id}")
```
**WARNING**: Recoverable issues, fallbacks, invalid input
```python
self.logger.warning(f"Invalid override key '{key}' ignored")
self.logger.warning(f"Asset expired, using default TTL")
```
**ERROR**: Operation failures, exceptions
```python
self.logger.error(f"Failed to queue workflow: {e}", exc_info=True)
self.logger.error(f"Timeout after {timeout}s waiting for completion")
```
### Structured Logging Pattern
For complex operations, use structured context:
```python
class WorkflowOrchestrator:
def execute_and_wait(self, workflow: dict, timeout: int = 300) -> dict:
"""Execute workflow with structured logging."""
context = {
"node_count": len(workflow),
"timeout": timeout,
}
self.logger.info(f"Starting workflow execution", extra=context)
try:
# Queue workflow
res = asyncio.run(queue_workflow(auth=self.auth, workflow=workflow))
prompt_id = res.response["prompt_id"]
context["prompt_id"] = prompt_id
self.logger.info(f"Workflow queued, polling for completion", extra=context)
# Poll for completion
start_time = time.time()
iterations = 0
while time.time() - start_time < timeout:
iterations += 1
res = asyncio.run(get_prompt_history(
auth=self.auth,
prompt_id=prompt_id,
))
history = res.response.get(prompt_id, {})
if history.get("status", {}).get("completed", False):
duration = time.time() - start_time
context.update({
"duration_s": round(duration, 2),
"iterations": iterations,
})
self.logger.info(f"Workflow completed", extra=context)
return self._extract_outputs(history)
# Log milestone every 30 seconds
if iterations % 30 == 0:
elapsed = time.time() - start_time
self.logger.debug(
f"Still waiting... {elapsed:.0f}s elapsed",
extra={**context, "iterations": iterations}
)
time.sleep(1)
# Timeout
context["iterations"] = iterations
self.logger.error(f"Workflow timeout", extra=context)
raise TimeoutError(f"Workflow {prompt_id} timeout after {timeout}s")
except Exception as e:
self.logger.error(
f"Workflow execution failed: {e}",
extra=context,
exc_info=True
)
raise
```
### Sensitive Data Handling
Never log sensitive information:
```python
class WorkflowOrchestrator:
def apply_overrides(self, workflow: dict, overrides: dict) -> dict:
"""Apply overrides with safe logging."""
# ❌ BAD: Might log sensitive values
# self.logger.info(f"Applying overrides: {overrides}")
# ✅ GOOD: Log keys only
self.logger.info(f"Applying {len(overrides)} overrides: {list(overrides.keys())}")
# ✅ GOOD: Sanitize sensitive keys
safe_overrides = {
k: "[REDACTED]" if "token" in k.lower() or "key" in k.lower() else v
for k, v in overrides.items()
}
self.logger.debug(f"Override values: {safe_overrides}")
```
---
## Implemented Orchestrators
### AssetOrchestrator ([src/orchestrators/asset.py](asset.py))
**Purpose**: Manages asset lifecycle with TTL expiration, thread-safe operations, and provenance tracking.
**Key Features**:
- In-memory registry with stable identity `(filename, subfolder, folder_type)`
- Thread-safe with `threading.RLock()`
- TTL-based expiration (default: 24 hours)
- Provenance tracking (comfy_history, submitted_workflow)
- Uses routes: `get_asset()`, `get_asset_metadata()`
**Key Methods**:
```python
asset_orch = AssetOrchestrator(auth=auth, ttl_hours=24)
# Register asset with provenance
record = asset_orch.register_asset(
filename="ComfyUI_00123.png",
subfolder="",
folder_type="output",
workflow_id="generate_image",
prompt_id="abc-123",
comfy_history={...}, # Full history for regeneration
submitted_workflow={...} # Original workflow
)
# Retrieve with expiration check
record = asset_orch.get_asset_record(asset_id="uuid-123")
# Fetch bytes via route
bytes = await asset_orch.fetch_asset_bytes(record)
# List with filters
assets = asset_orch.list_assets(limit=10, workflow_id="generate_image", session_id="conv-123")
# Cleanup expired
removed = asset_orch.cleanup_expired()
```
### WorkflowOrchestrator ([src/orchestrators/workflow.py](workflow.py))
**Purpose**: Workflow execution with polling and asset extraction. Split into focused classes following SOLID principles.
**Classes**:
- `ParameterExtractor`: Parse PARAM_ placeholders (single responsibility)
- `WorkflowLoader`: File I/O and catalog management
- `WorkflowRenderer`: Parameter binding and type coercion
- `WorkflowOrchestrator`: Execute workflows via routes with polling
**FastMCP v3 Integration** ✅:
- **Context Support**: Progress reporting via FastMCP Context
- **Required Dependencies**: No Optional types, explicit dependency injection
- **Resources Available**: Workflows exposed as MCP Resources for inspection
**Key Features**:
- Uses routes: `queue_workflow()`, `get_prompt_history()`
- Polling with configurable timeout and interval
- Asset extraction from workflow outputs
- Comprehensive logging throughout
- Progress reporting milestones every 30 seconds
- Context passthrough for MCP tool integration
**Key Methods**:
```python
# Load workflows from disk
loader = WorkflowLoader(workflows_dir=Path("./workflows"))
workflow = loader.load_workflow("generate_image")
catalog = loader.get_workflow_catalog()
definitions = loader.load_tool_definitions()
# Extract parameters
parameters = ParameterExtractor.extract_parameters(workflow)
# Render workflow with parameters
definition = WorkflowToolDefinition(...)
rendered = WorkflowRenderer.render_workflow(
definition=definition,
provided_params={"prompt": "a cat", "steps": 30},
defaults_provider=defaults_manager
)
# Initialize orchestrator with REQUIRED dependencies (FastMCP v3 pattern)
orch = WorkflowOrchestrator(
auth=auth,
workflow_loader=loader, # Required
defaults_manager=defaults_manager, # Required
)
# Execute and wait with Context support
result = await orch.execute_and_wait(
workflow=rendered,
max_attempts=300,
poll_interval=1.0,
output_preferences=("images", "image"),
ctx=context, # Optional FastMCP Context for progress reporting
)
# If ctx provided:
# - Reports progress at 0%, milestones, and 100%
# - Logs structured events via ctx.info(), ctx.error()
# - Provides real-time feedback to agents
# High-level execution with parameter rendering
result = await orch.execute_workflow(
auth=auth,
workflow_id="generate_image",
parameters={"prompt": "a cat", "steps": 30},
ctx=context, # Context passthrough for progress
)
```
**FastMCP v3 Best Practices Applied**:
1. **Explicit Dependency Injection**
```python
# ✅ Required dependencies - errors caught at construction
def __init__(
self,
auth: ComfyAuth,
workflow_loader: WorkflowLoader, # Required, not Optional
defaults_manager: DefaultsManager, # Required, not Optional
)
```
2. **Context Integration**
```python
# Progress reporting through FastMCP Context
if ctx and Context:
await ctx.info("Queueing workflow for execution")
await ctx.report_progress(0.0, "Workflow queued")
# ... during polling ...
await ctx.report_progress(0.5, "Processing...")
# ... on completion ...
await ctx.report_progress(1.0, "Complete")
```
3. **Graceful Fallback**
```python
# Works with or without Context
if ctx and Context:
await ctx.info(message) # Structured logging via MCP
else:
logger.info(message) # Fallback to stdlib logging
```
### DefaultsManager ([src/orchestrators/defaults.py](defaults.py))
**Purpose**: Manages default parameter values with precedence chain. Decoupled from ComfyUIClient using dependency injection.
**Precedence Chain** (highest to lowest):
1. per-call: Values provided in tool calls
2. runtime: Values set via `set_defaults()`
3. config: Values from `~/.config/comfy-mcp/config.json`
4. env: Environment variables (COMFY_MCP_DEFAULT_IMAGE_MODEL, etc.)
5. hardcoded: Built-in defaults
**Key Features**:
- Dependency Inversion: Accepts `model_validator: Callable[[str], bool]`
- No tight coupling to ComfyUIClient
- Persistent configuration support
- Namespace-based (image, audio, video)
- **Null Object Pattern**: NullDefaultsManager for optional defaults (FastMCP v3)
**Key Methods**:
```python
# Initialize with optional validator
def validate_model(model_name: str) -> bool:
# Call route to check if model exists
return model_name in available_models
defaults_mgr = DefaultsManager(model_validator=validate_model)
# Get with precedence
value = defaults_mgr.get_default(namespace="image", key="steps", provided_value=None)
# Set runtime defaults
defaults_mgr.set_defaults(
namespace="image",
defaults={"model": "v1-5-pruned.ckpt", "steps": 25},
validate_models=True
)
# Persist to config file
defaults_mgr.persist_defaults(namespace="image", defaults={"model": "..."})
# Get all effective defaults
all_defaults = defaults_mgr.get_all_defaults()
```
**Null Object Pattern** (FastMCP v3):
When defaults are optional (e.g., testing, minimal setups), use `NullDefaultsManager`:
```python
from orchestrators.defaults import DefaultsManager, NullDefaultsManager
# ✅ Production: Full defaults support
defaults = DefaultsManager()
orch = WorkflowOrchestrator(
auth=auth,
workflow_loader=loader,
defaults_manager=defaults,
)
# ✅ Testing/Minimal: No defaults (Null Object)
null_defaults = NullDefaultsManager()
orch = WorkflowOrchestrator(
auth=auth,
workflow_loader=loader,
defaults_manager=null_defaults, # Always returns None
)
# NullDefaultsManager behavior:
null_defaults.get_default("image_generation", "steps") # → None
null_defaults.get_defaults("image_generation") # → {}
null_defaults.set_default(...) # No-op
null_defaults.clear_defaults(...) # No-op
```
**Why Null Object Pattern?**
1. **No Optional Types**: Dependencies are always required, no `Optional[DefaultsManager]`
2. **No None Checks**: Orchestrator code never checks `if defaults_manager:`
3. **Polymorphic**: Both implement same interface, swappable at runtime
4. **Testing Simplicity**: Tests can use `NullDefaultsManager()` without mocking
### PublishManager ([src/orchestrators/publish.py](publish.py))
**Purpose**: Safe asset publishing to web project directories with comprehensive security.
**Key Features**:
- Path traversal protection (canonicalize paths, containment checks)
- Filename validation (regex-based whitelist)
- Auto-detection of project root and publish directories
- Optional web optimization (WebP conversion, compression)
- Atomic file writes with manifest updates
- Thread-safe manifest operations
**Key Methods**:
```python
config = PublishConfig(
project_root=None, # Auto-detect
publish_root=None, # Auto-detect (public/gen, static/gen, etc.)
comfyui_output_root=None # Auto-detect with fallback
)
pub_mgr = PublishManager(config)
# Check readiness
is_ready, error_code, error_info = pub_mgr.ensure_ready()
# Resolve and validate paths
source_path = pub_mgr.resolve_source_path(subfolder="", filename="ComfyUI_00123.png")
target_path = pub_mgr.resolve_target_path(target_filename="hero-image.webp")
# Copy with optional compression
result = pub_mgr.copy_asset(
source_path=source_path,
target_path=target_path,
web_optimize=True, # Convert to WebP, compress
max_bytes=600_000 # 600KB limit
)
# Update manifest
pub_mgr.update_manifest(manifest_key="hero_image", filename="hero-image.webp")
# Get status
info = pub_mgr.get_publish_info()
```
---
## Anti-Patterns
### ❌ Direct HTTP Calls
**Bad**:
```python
class WorkflowOrchestrator:
def queue_workflow(self, workflow):
# DON'T: Direct HTTP call
response = requests.post(
f"{self.base_url}/prompt",
json={"prompt": workflow}
)
return response.json()
```
**Good**:
```python
class WorkflowOrchestrator:
def queue_workflow(self, workflow):
# DO: Use route function
res = asyncio.run(queue_workflow(
auth=self.auth,
workflow=workflow,
))
return res.response
```
### ❌ Route Logic in Orchestrator
**Bad**:
```python
class WorkflowOrchestrator:
async def get_queue(self):
# DON'T: Reimplementing route
res = await get_data(
auth=self.auth,
method="GET",
url=f"{self.auth.base_url}/queue",
)
return res
```
**Good**:
```python
class WorkflowOrchestrator:
def get_queue(self):
# DO: Use existing route
res = asyncio.run(get_queue(auth=self.auth))
return res.response
```
### ❌ Mixing Sync/Async Without Bridge
**Bad**:
```python
class WorkflowOrchestrator:
def queue_workflow(self, workflow):
# DON'T: Can't await in sync method
res = await queue_workflow(auth=self.auth, workflow=workflow)
return res
```
**Good**:
```python
class WorkflowOrchestrator:
def queue_workflow(self, workflow):
# DO: Use asyncio.run() bridge
res = asyncio.run(queue_workflow(auth=self.auth, workflow=workflow))
return res.response
```
---
## Polling Pattern
Orchestrators often need to poll for completion:
```python
import time
class WorkflowOrchestrator:
def execute_and_wait(self, workflow: dict, timeout: int = 300) -> dict:
"""Execute workflow and poll for completion.
Args:
workflow: Workflow definition
timeout: Max wait time in seconds
Returns:
Completed workflow result
Raises:
TimeoutError: If workflow doesn't complete
"""
self.logger.info(f"Starting workflow execution (timeout={timeout}s)")
# Queue workflow
res = asyncio.run(queue_workflow(auth=self.auth, workflow=workflow))
prompt_id = res.response["prompt_id"]
self.logger.info(f"Workflow queued: {prompt_id}")
# Poll for completion
start_time = time.time()
iterations = 0
while time.time() - start_time < timeout:
iterations += 1
res = asyncio.run(get_prompt_history(
auth=self.auth,
prompt_id=prompt_id,
))
history = res.response.get(prompt_id, {})
if history.get("status", {}).get("completed", False):
duration = time.time() - start_time
self.logger.info(
f"Workflow completed in {duration:.2f}s after {iterations} polls"
)
return self._extract_outputs(history)
# Log progress every 30 iterations (~30 seconds)
if iterations % 30 == 0:
elapsed = time.time() - start_time
self.logger.debug(f"Still polling... {elapsed:.0f}s elapsed")
time.sleep(1) # Poll interval
self.logger.error(f"Workflow timeout after {timeout}s ({iterations} polls)")
raise TimeoutError(f"Workflow {prompt_id} timeout after {timeout}s")
def _extract_outputs(self, history: dict) -> dict:
"""Extract outputs from history (business logic)."""
# Parse and transform outputs
pass
```
---
## Session Reuse Pattern
For multiple route calls, reuse session for performance:
```python
from client import httpx_session_context
class WorkflowOrchestrator:
async def batch_execute(self, workflows: list[dict]) -> list[dict]:
"""Execute multiple workflows with session reuse."""
results = []
# Reuse session across all calls
async with httpx_session_context() as session:
for workflow in workflows:
res = await queue_workflow(
auth=self.auth,
workflow=workflow,
session=session, # Reuse connection
)
results.append(res.response)
return results
```
---
## Error Handling Pattern
```python
from routes.workflow import WorkflowError
from routes.queue import QueueError
class WorkflowOrchestrator:
def execute_workflow(self, workflow: dict) -> dict:
"""Execute with error handling and logging."""
self.logger.info(f"Executing workflow with {len(workflow)} nodes")
try:
res = asyncio.run(queue_workflow(
auth=self.auth,
workflow=workflow,
))
prompt_id = res.response["prompt_id"]
self.logger.info(f"Workflow executed successfully: {prompt_id}")
return {"success": True, "id": prompt_id}
except WorkflowError as e:
# Handle route-specific errors
self.logger.error(f"Workflow validation error: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
# Handle unexpected errors with stack trace
self.logger.error(
f"Unexpected error executing workflow: {e}",
exc_info=True # Include full stack trace
)
return {"success": False, "error": "Internal error"}
```
---
## Dependency Injection
Orchestrators receive Auth and other dependencies:
```python
class WorkflowOrchestrator:
def __init__(
self,
auth: ComfyAuth,
asset_registry=None,
defaults_manager=None,
):
"""Initialize with dependencies.
Args:
auth: Required auth instance
asset_registry: Optional asset registry
defaults_manager: Optional defaults manager
"""
self.auth = auth
self.asset_registry = asset_registry
self.defaults_manager = defaults_manager
```
---
## Migration Checklist
When refactoring from old code:
- [ ] Replace `requests.*` with route function calls
- [ ] Add `auth: ComfyAuth` parameter to `__init__`
- [ ] Add `asyncio.run()` bridges for sync methods calling async routes
- [ ] Remove direct HTTP client usage
- [ ] Remove manual header construction
- [ ] Extract business logic from routes
- [ ] Add proper error handling
- [ ] Add logging
- [ ] Write unit tests with mocked routes
---
## Testing & Validation
### Test Commands
```bash
# Run all tests
pytest tests/ -v
# Run specific orchestrator tests
pytest tests/test_asset_registry.py -v
pytest tests/test_publish.py -v
# Run with coverage
pytest tests/ --cov=src/orchestrators --cov-report=html
```
### Test Strategy
**Unit Testing**:
- Mock route functions using `pytest-asyncio` and `pytest-httpx`
- Test business logic in isolation
- Verify validation, transformation, error handling
- Test sync-to-async bridging
**Integration Testing**:
- Use test ComfyUI instance (localhost:8188)
- Test full orchestrator workflows
- Verify polling and retry logic
- Test asset lifecycle management
**Test Pattern Example**:
```python
import pytest
from unittest.mock import AsyncMock, MagicMock
from src.orchestrators.workflow import WorkflowOrchestrator
from src.auth.base import NoAuth
from src.client.response import ResponseGetData
@pytest.mark.asyncio
async def test_execute_workflow_success():
"""Test successful workflow execution."""
# Setup
auth = NoAuth("http://test:8188")
orchestrator = WorkflowOrchestrator(auth, workflows_dir="/tmp/test")
# Mock route functions
mock_queue = AsyncMock(return_value=ResponseGetData(
response={"prompt_id": "test-123"},
status=200,
is_success=True
))
# Execute
with patch('src.orchestrators.workflow.queue_workflow', mock_queue):
result = await orchestrator.execute_workflow(
auth=auth,
workflow_id="test_workflow",
parameters={"prompt": "test"}
)
# Verify
assert result["prompt_id"] == "test-123"
mock_queue.assert_called_once()
```
### Testing Best Practices
1. **Mock at the route boundary**: Mock route functions, not HTTP client
2. **Test error paths**: Verify exception handling and recovery
3. **Test timeouts**: Verify polling timeout behavior
4. **Test validation**: Ensure input validation catches invalid data
5. **Test thread safety**: For AssetOrchestrator, test concurrent access
---
## Related Documentation
### Architecture Guides
- **[Root AGENTS.md](../../AGENTS.md)** - Overall architecture and behavioral protocols
- **[Routes Layer](../routes/AGENTS.md)** - Route function patterns that orchestrators call
### Related Skills
- **[FastMCP v3](../../.github/skills/fastmcp-v3/SKILL.md)** - FastMCP v3 server patterns and tool registration
- **[MCP Logging](../../.github/skills/mcp-logging/SKILL.md)** - Correlation ID tracking and structured logging patterns
- Current orchestrators use stdlib `logging` module
- See MCP Logging skill for structured async logging patterns
- Examples show business logic logging in manager classes
---
**Last Updated**: January 24, 2026
**Status**: FastMCP v3 Integration Complete (Context, Resources, Null Object Pattern)