server.py•8.18 kB
"""High level MCP server facade."""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence
from .assets import AssetCatalog
from .client import ComfyUIClient
from .config import ComfyUISettings
from .mutations import WorkflowMutator
from .workflow import WorkflowDiscovery, WorkflowTemplate
class ComfyUIMCPServer:
"""Encapsulates the MCP tools exposed to the LLM."""
def __init__(self, settings: Optional[ComfyUISettings] = None) -> None:
self.settings = settings or ComfyUISettings()
self.discovery = WorkflowDiscovery(self.settings.workflows_directory())
self.client = ComfyUIClient(self.settings.base_url, api_key=self.settings.api_key)
self.assets = AssetCatalog(self.settings.directories, base_path=self.settings.resolve_path())
self.assets.validate_directories()
self._watch_workflows = self.settings.feature_toggles.watch_workflows
async def close(self) -> None:
await self.client.close()
async def list_workflows(self) -> List[Dict[str, Any]]:
self._maybe_refresh_workflows()
templates = self.discovery.list_templates()
return [template.summary() for template in templates]
async def list_assets(self) -> Dict[str, List[str]]:
self.assets.refresh()
return self.assets.list()
async def describe_workflow(self, name: str) -> Dict[str, Any]:
self._maybe_refresh_workflows()
template = self._require_template(name)
summary = template.summary()
summary["graph"] = {
"nodes": [template.graph.nodes_by_id[node_id].raw for node_id in sorted(template.graph.nodes_by_id)],
"links": template.payload.get("links", []),
}
return summary
async def customize_workflow(self, name: str, changes: Mapping[str, Any]) -> Dict[str, Any]:
template = self._require_template(name)
mutator = WorkflowMutator(template)
self._apply_changes(mutator, changes)
new_template = mutator.apply()
return {
"workflow": new_template.payload,
"diff": mutator.diff.changes,
}
async def execute_workflow(
self,
name: str,
changes: Optional[Mapping[str, Any]] = None,
stream_updates: bool = False,
) -> Dict[str, Any]:
self._maybe_refresh_workflows()
template = self._require_template(name)
mutator = WorkflowMutator(template)
if changes:
self._apply_changes(mutator, changes)
template = mutator.apply()
submission = await self.client.submit_workflow(template.payload)
prompt_id = submission.get("prompt_id") or submission.get("promptId")
result: Dict[str, Any] = {
"prompt_id": prompt_id,
"submission": submission,
"diff": mutator.diff.changes if changes else {},
}
if stream_updates and prompt_id:
updates: List[Dict[str, Any]] = []
async def _collector(payload: Dict[str, Any]) -> None:
updates.append(payload)
await self.client.stream_updates(prompt_id, _collector)
result["updates"] = updates
elif prompt_id:
result["history"] = await self.client.get_history(prompt_id)
return result
@asynccontextmanager
async def lifecycle(self) -> "ComfyUIMCPServer":
try:
yield self
finally:
await self.close()
def _apply_changes(self, mutator: WorkflowMutator, changes: Mapping[str, Any]) -> None:
prompts = changes.get("prompts", {})
if isinstance(prompts, Mapping):
for role, text in prompts.items():
mutator.update_prompt(role, str(text))
checkpoint = changes.get("checkpoint")
if isinstance(checkpoint, str):
self._ensure_asset("checkpoints", checkpoint)
mutator.set_checkpoint(checkpoint)
sampler = changes.get("sampler")
if isinstance(sampler, Mapping):
cfg = sampler.get("cfg")
if cfg is not None:
self._validate_range(
"cfg",
float(cfg),
self.settings.default_bounds.cfg_min,
self.settings.default_bounds.cfg_max,
)
mutator.set_cfg(float(cfg))
steps = sampler.get("steps")
if steps is not None:
self._validate_range(
"steps",
int(steps),
self.settings.default_bounds.steps_min,
self.settings.default_bounds.steps_max,
)
mutator.set_steps(int(steps))
seed = sampler.get("seed")
if seed is not None:
mutator.set_seed(int(seed))
vae = changes.get("vae")
if isinstance(vae, Mapping):
vae_name = vae.get("name") or vae.get("vae") or vae.get("vae_name")
if vae_name:
self._ensure_asset("vaes", str(vae_name))
mutator.set_vae(str(vae_name))
elif isinstance(vae, str):
self._ensure_asset("vaes", vae)
mutator.set_vae(vae)
loras = changes.get("loras")
loras_sequence: List[Mapping[str, Any]] = []
if isinstance(loras, Mapping):
loras_sequence = [loras]
elif isinstance(loras, Iterable) and not isinstance(loras, (str, bytes)):
for item in loras:
if not isinstance(item, Mapping):
raise TypeError("Each LoRA configuration must be a mapping")
loras_sequence.append(item)
if loras_sequence:
for spec in loras_sequence:
name_value = spec.get("name") or spec.get("lora_name")
if isinstance(name_value, str):
self._ensure_asset("loras", name_value)
mutator.configure_loras(loras_sequence)
resolution = changes.get("resolution")
width: Optional[int] = None
height: Optional[int] = None
if isinstance(resolution, Mapping):
if "width" in resolution:
width = int(resolution["width"])
if "height" in resolution:
height = int(resolution["height"])
elif isinstance(resolution, Sequence) and not isinstance(resolution, (str, bytes)) and len(resolution) >= 2:
width = int(resolution[0]) if resolution[0] is not None else None
height = int(resolution[1]) if resolution[1] is not None else None
if width is not None:
self._validate_range(
"width",
width,
self.settings.default_bounds.width_min,
self.settings.default_bounds.width_max,
)
if height is not None:
self._validate_range(
"height",
height,
self.settings.default_bounds.height_min,
self.settings.default_bounds.height_max,
)
if width is not None or height is not None:
mutator.set_resolution(width=width, height=height)
def _validate_range(self, name: str, value: float | int, minimum: float | int, maximum: float | int) -> None:
if value < minimum or value > maximum:
raise ValueError(f"{name} value {value} must be between {minimum} and {maximum}")
def _ensure_asset(self, category: str, name: str) -> None:
try:
self.assets.ensure_exists(category, name)
except KeyError as exc:
raise ValueError(str(exc)) from exc
def _require_template(self, name: str) -> WorkflowTemplate:
self._maybe_refresh_workflows()
template = self.discovery.get(name)
if template is None:
raise KeyError(f"Unknown workflow template '{name}'")
return template
def _maybe_refresh_workflows(self) -> None:
if self._watch_workflows:
self.discovery.refresh()
__all__ = ["ComfyUIMCPServer"]