"""Parallel-with-template 工具处理器。
处理 *_parallel_with_template 模式:
- 使用 simple_template 将 template + variables[] 渲染成 parallel_prompts/parallel_task_notes
- 然后委托给现有 ParallelHandler 执行并写入 handoff_file
"""
from __future__ import annotations
from typing import Any
from mcp.types import TextContent
from .base import ToolContext, ToolHandler
from .parallel import ParallelHandler
from ..shared.response_formatter import format_error_response
from ..utils.simple_template import (
SimpleTemplateError,
SimpleTemplateSyntaxError,
SimpleTemplateUndefinedError,
render_simple_template,
validate_simple_template,
)
__all__ = ["ParallelWithTemplateHandler"]
_MAX_TASKS = 100
_MAX_TASK_NOTE_LEN = 120
class ParallelWithTemplateHandler(ToolHandler):
"""Simple placeholder templating layer + Parallel execution layer."""
def __init__(self, base_name: str, *, tool_suffix: str = "_parallel_with_template"):
self._base_name = base_name
self._tool_suffix = tool_suffix
@property
def name(self) -> str:
return f"{self._base_name}{self._tool_suffix}"
@property
def description(self) -> str:
return (
f"Render a simple placeholder template into multiple prompts (one per variables[i]) and run them in parallel "
f"using {self._base_name}.\n\n"
"Supported syntax: {{ name }} placeholders (optionally dotted dict paths like {{ vars.file_path }}). "
"No filters / function calls / conditionals / loops.\n\n"
"Missing variables fail the render. Escapes: '{{{{' -> '{{', '}}}}' -> '}}'.\n\n"
"Auto-injected variables per task (override same-named keys in variables[i]): task_index, task_count, vars. "
"Parallel options (e.g. parallel_max_concurrency, parallel_fail_fast) still apply. "
"If compact_response=true, returns a compact summary; full output is still written to handoff_file."
)
def get_input_schema(self) -> dict[str, Any]:
from ..tool_schema import create_parallel_with_template_schema
return create_parallel_with_template_schema(self._base_name)
def validate(self, arguments: dict[str, Any]) -> str | None:
template = arguments.get("template")
variables = arguments.get("variables")
task_note_template = arguments.get("task_note_template", "")
if not isinstance(template, str) or not template.strip():
return "template is required and must be a non-empty string"
if not isinstance(variables, list) or not variables:
return "variables is required and must be a non-empty list of objects"
if len(variables) > _MAX_TASKS:
return f"variables exceeds maximum of {_MAX_TASKS}"
for i, v in enumerate(variables):
if not isinstance(v, dict):
return f"variables[{i}] must be an object/dict"
if not isinstance(task_note_template, str):
return "task_note_template must be a string"
if not arguments.get("handoff_file"):
return "handoff_file is required in parallel mode"
if not arguments.get("workspace"):
return "workspace is required"
return None
@staticmethod
def _truncate_task_note(note: str) -> str:
note = note.strip()
if len(note) <= _MAX_TASK_NOTE_LEN:
return note
return note[: _MAX_TASK_NOTE_LEN - 3] + "..."
async def handle(self, arguments: dict[str, Any], ctx: ToolContext) -> list[TextContent]:
error = self.validate(arguments)
if error:
return format_error_response(error)
template_str = (arguments.get("template") or "").strip()
variables_list: list[dict[str, Any]] = arguments.get("variables", [])
task_note_template_str = (arguments.get("task_note_template") or "").strip()
try:
validate_simple_template(template_str, template_name="template")
except SimpleTemplateSyntaxError as e:
return format_error_response(f"Template syntax error in 'template': {e}")
if task_note_template_str:
try:
validate_simple_template(task_note_template_str, template_name="task_note_template")
except SimpleTemplateSyntaxError as e:
return format_error_response(f"Template syntax error in 'task_note_template': {e}")
task_count = len(variables_list)
parallel_prompts: list[str] = []
parallel_task_notes: list[str] = []
for zero_idx, user_vars in enumerate(variables_list):
# Auto-injected variables. Use 1-based index for human-friendly templates.
task_index = zero_idx + 1
render_vars = dict(user_vars)
render_vars["task_index"] = task_index
render_vars["task_count"] = task_count
render_vars["vars"] = user_vars
try:
rendered_prompt = (render_simple_template(template_str, render_vars) or "").strip()
except SimpleTemplateUndefinedError as e:
return format_error_response(
f"Template rendering failed for variables[{zero_idx}] while rendering 'template': {e}"
)
except SimpleTemplateError as e:
return format_error_response(
f"Template rendering failed for variables[{zero_idx}] while rendering 'template': {e}"
)
if not rendered_prompt:
return format_error_response(
f"Rendered prompt is empty for variables[{zero_idx}]. "
"Check that your 'template' produces non-empty text for this variables item."
)
if task_note_template_str:
try:
rendered_note = (render_simple_template(task_note_template_str, render_vars) or "").strip()
except SimpleTemplateUndefinedError as e:
return format_error_response(
f"Template rendering failed for variables[{zero_idx}] while rendering 'task_note_template': {e}"
)
except SimpleTemplateError as e:
return format_error_response(
f"Template rendering failed for variables[{zero_idx}] while rendering 'task_note_template': {e}"
)
if not rendered_note:
return format_error_response(
f"Rendered task_note is empty for variables[{zero_idx}]. "
"Check that your 'task_note_template' produces non-empty text for this variables item."
)
task_note = rendered_note
else:
fallback = (
user_vars.get("task_note")
or user_vars.get("task")
or user_vars.get("title")
or f"task_{task_index}"
)
task_note = str(fallback).strip()
if not task_note:
task_note = f"task_{task_index}"
parallel_prompts.append(rendered_prompt)
parallel_task_notes.append(self._truncate_task_note(task_note))
derived_args = dict(arguments)
derived_args["parallel_prompts"] = parallel_prompts
derived_args["parallel_task_notes"] = parallel_task_notes
derived_args["compact_response"] = arguments.get("compact_response", False)
# Delegate to the existing parallel execution logic.
return await ParallelHandler(self._base_name).handle(derived_args, ctx)