claude_parallel
Execute multiple Claude AI tasks concurrently within a shared workspace, with results saved to a designated file for efficient batch processing.
Instructions
Run multiple claude tasks in parallel. All tasks share workspace/permission/save_file. Results are appended to save_file with XML wrappers (). Max 100 tasks. Model can be array: single element shared by all, or one per task.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| workspace | Yes | Project root directory. Boundary for 'workspace-write'. Use absolute paths or relative paths. | |
| permission | No | Security level: 'read-only' (analyze files), 'workspace-write' (modify inside workspace), 'unlimited' (full system access). Default: 'read-only'. | read-only |
| save_file | Yes | PREFERRED when agent needs to write files or produce lengthy output. Output is written directly to this path, avoiding context overflow. This write is permitted even in read-only mode (server-handled). Essential for: code generation, detailed reports, documentation. | |
| report_mode | No | Generate a standalone, document-style report (no chat filler) suitable for sharing. | |
| context_paths | No | List of relevant files/dirs to preload as context hints. | |
| model | No | Model override(s). If single element, all tasks use that model. If multiple elements, must match parallel_prompts length - each task uses corresponding model. Empty array uses CLI default. | |
| system_prompt | No | Complete replacement for the default system prompt. Use only when you need full control over agent behavior. Prefer append_system_prompt for most cases. | |
| append_system_prompt | No | Additional instructions appended to the default system prompt. Recommended way to customize behavior. Example: 'Focus on performance optimization, avoid adding new dependencies' | |
| agent | No | Specify an agent for the current session (overrides the default agent setting). Use predefined agent names configured in Claude Code settings. | |
| parallel_prompts | Yes | Complete prompts for parallel execution. Each spawns an independent subprocess. | |
| parallel_task_notes | Yes | Labels for each task. Length MUST equal parallel_prompts. | |
| parallel_max_concurrency | No | Max concurrent subprocesses. | |
| parallel_fail_fast | No | Stop spawning new tasks when any fails (already running tasks continue). | |
| debug | No | Enable execution stats (tokens, duration) for this call. |
Implementation Reference
- ParallelHandler class provides the core execution logic for 'claude_parallel' tool. It creates multiple subprocesses using Claude invoker, runs them concurrently, wraps outputs in XML, and appends to save_file.class ParallelHandler(ToolHandler): """Parallel 模式工具处理器。""" def __init__(self, base_name: str): """初始化 ParallelHandler。 Args: base_name: 基础工具名称(如 codex, gemini, claude, opencode) """ self._base_name = base_name @property def name(self) -> str: return f"{self._base_name}_parallel" @property def description(self) -> str: return ( f"Run multiple {self._base_name} tasks in parallel. " f"All tasks share workspace/permission/save_file. " f"Results are appended to save_file with XML wrappers " f"(<agent-output agent=... continuation_id=... task_note=... task_index=... status=...>)." ) def get_input_schema(self) -> dict[str, Any]: from ..tool_schema import create_tool_schema return create_tool_schema(self._base_name, is_parallel=True) def validate(self, arguments: dict[str, Any]) -> str | None: prompts = arguments.get("parallel_prompts", []) task_notes = arguments.get("parallel_task_notes", []) # 类型校验 if not isinstance(prompts, list): return "parallel_prompts must be a list" if not isinstance(task_notes, list): return "parallel_task_notes must be a list" if not prompts: return "parallel_prompts is required" # 检查空白字符串和类型 for i, p in enumerate(prompts): if not isinstance(p, str): return f"parallel_prompts[{i}] must be a string" if not p or not p.strip(): return f"parallel_prompts[{i}] is empty or whitespace" for i, n in enumerate(task_notes): if not isinstance(n, str): return f"parallel_task_notes[{i}] must be a string" if not n or not n.strip(): return f"parallel_task_notes[{i}] is empty or whitespace" if len(prompts) != len(task_notes): return "parallel_prompts and parallel_task_notes must have same length" if len(prompts) > 100: return "parallel_prompts exceeds maximum of 100" if arguments.get("continuation_id"): return "continuation_id input is not supported in parallel mode" # model 数组校验 models = arguments.get("model", []) if isinstance(models, list) and len(models) > 1 and len(models) != len(prompts): return f"model array length ({len(models)}) must be 1 or match parallel_prompts length ({len(prompts)})" if not arguments.get("save_file"): return "save_file is required in parallel mode" return None async def handle( self, arguments: dict[str, Any], ctx: ToolContext, ) -> list[TextContent]: """处理 parallel 模式的工具调用。""" # 1) 校验 error = self.validate(arguments) if error: return format_error_response(error) prompts = arguments.get("parallel_prompts", []) task_notes = arguments.get("parallel_task_notes", []) save_file = arguments.get("save_file") # clamp concurrency (handle string/invalid types) try: max_conc = int(arguments.get("parallel_max_concurrency", 20)) except (TypeError, ValueError): max_conc = 20 max_conc = max(1, min(100, max_conc)) fail_fast = arguments.get("parallel_fail_fast", False) # 推送用户 prompt 到 GUI(每个 prompt 单独推送) for prompt, note in zip(prompts, task_notes): ctx.push_user_prompt(f"{self._base_name}_parallel", prompt, note) # 2) 构建子任务 sub_tasks = [] context_paths = arguments.get("context_paths", []) report_mode = arguments.get("report_mode", False) models = arguments.get("model", []) if not isinstance(models, list): models = [models] if models else [] for idx, (prompt, note) in enumerate(zip(prompts, task_notes), start=1): # 注入 context_paths 和 report_mode final_prompt = inject_context_and_report_mode(prompt, context_paths, report_mode) # model 分发:单个则共享,多个则按索引分配 if len(models) == 1: model = models[0] elif len(models) >= idx: model = models[idx - 1] else: model = "" sub_tasks.append({ "prompt": final_prompt, "workspace": arguments.get("workspace"), "permission": arguments.get("permission", "read-only"), "model": model, "task_note": note, "_task_index": idx, # CLI 特有参数 "image": arguments.get("image", []), # codex "system_prompt": arguments.get("system_prompt", ""), # claude "append_system_prompt": arguments.get("append_system_prompt", ""), # claude "agent": arguments.get("agent", ""), # claude/opencode "file": arguments.get("file", []), # opencode }) # 3) 并发执行 sem = asyncio.Semaphore(max_conc) should_stop = False results: list[tuple[int, str, Any]] = [] # (task_index, task_note, result|Exception|None) async def run_one(sub_args: dict): nonlocal should_stop async with sem: # fail_fast 检查必须在拿到 semaphore 后 if fail_fast and should_stop: return (sub_args["_task_index"], sub_args["task_note"], None) # skipped try: # 创建 invoker(传入 task_note 和 task_index 用于 GUI 显示) task_note = sub_args.get("task_note", "") task_index = sub_args.get("_task_index") event_callback = ctx.make_event_callback(self._base_name, task_note, task_index) if ctx.gui_manager else None invoker = create_invoker(self._base_name, event_callback=event_callback) # 构建参数 params = build_params(self._base_name, sub_args) # 执行 result = await invoker.execute(params) if not result.success and fail_fast: should_stop = True return (sub_args["_task_index"], sub_args["task_note"], result) except asyncio.CancelledError: # 必须 re-raise,不能当作普通异常处理 raise except Exception as e: if fail_fast: should_stop = True return (sub_args["_task_index"], sub_args["task_note"], e) start_time = time.time() try: raw_results = await asyncio.gather(*[run_one(t) for t in sub_tasks], return_exceptions=True) except asyncio.CancelledError: raise duration_sec = time.time() - start_time # 处理 gather 返回的异常 for r in raw_results: if isinstance(r, asyncio.CancelledError): raise r # re-raise 取消 elif isinstance(r, Exception): # 不应发生,因为 run_one 已捕获 continue else: results.append(r) # 4) 按 task_index 排序后串行写文件 results.sort(key=lambda x: x[0]) success_count = 0 failed_count = 0 skipped_count = 0 summary_lines = [] all_wrapped = [] # 收集所有 wrapped 内容用于返回 formatter = get_formatter() for idx, note, result in results: if result is None: # skipped (fail_fast) skipped_count += 1 summary_lines.append(f"- [{idx}] {note} | skipped") continue elif isinstance(result, Exception): content = f"Error: {result}" status = "error" session_id = "" failed_count += 1 summary_lines.append(f"- [{idx}] {note} | error") elif result.success: # 使用 formatter 格式化内容 response_data = ResponseData( answer=result.agent_messages, session_id=result.session_id or "", thought_steps=[], debug_info=None, success=True, error=None, ) content = formatter.format_for_file(response_data) status = "success" session_id = result.session_id or "" success_count += 1 summary_lines.append(f"- [{idx}] {note} | success | session={session_id}") else: # result.error 已包含 exit code + stderr content = result.error or "Unknown error" status = "error" session_id = result.session_id or "" failed_count += 1 summary_lines.append(f"- [{idx}] {note} | error | session={session_id}") # 构建 wrapper wrapped = build_wrapper(self._base_name, session_id, note, idx, status, content) all_wrapped.append(wrapped) # 4.5) 批量写入文件(单次 I/O 操作) if all_wrapped: try: file_path = Path(save_file) file_path.parent.mkdir(parents=True, exist_ok=True) content_to_write = "\n".join(all_wrapped) if file_path.exists(): with file_path.open("a", encoding="utf-8") as f: f.write("\n" + content_to_write) # 前置换行防止粘连 else: file_path.write_text(content_to_write, encoding="utf-8") except Exception as e: logger.error(f"Failed to write to {save_file}: {e}") return format_error_response(f"Failed to write to {save_file}: {e}") # 5) 返回 wrapped 内容(与 save_file_with_wrapper 格式一致) summary = f"Parallel run: total={len(results)}, success={success_count}, failed={failed_count}, skipped={skipped_count}\n" summary += f"Saved to: {save_file}\n" summary += "\n".join(summary_lines) # 推送结果到 GUI ctx.push_to_gui({ "category": "system", "source": f"{self._base_name}_parallel", "message": summary, "severity": "info", "content_type": "text", "timestamp": time.time(), "raw": { "type": "parallel_complete", "success": success_count, "failed": failed_count, "skipped": skipped_count, }, "metadata": { "debug": { "total_tasks": len(results), "success_count": success_count, "failed_count": failed_count, "skipped_count": skipped_count, "duration_sec": duration_sec, "save_file": save_file, }, }, }) # 构建 debug_info(如果 debug 开启) debug_enabled = ctx.resolve_debug(arguments) debug_info = None if debug_enabled: debug_info = FormatterDebugInfo( model=None, duration_sec=duration_sec, message_count=len(results), tool_call_count=0, ) # 返回 wrapped 内容(与 save_file_with_wrapper 格式一致) # answer 包含所有任务的 XML wrapper 输出 has_failures = failed_count > 0 wrapped_content = "\n".join(all_wrapped) if all_wrapped else summary response_data = ResponseData( answer=wrapped_content, session_id="", # parallel 模式没有单一 session_id thought_steps=[], debug_info=debug_info, success=not has_failures, error=f"{failed_count} of {len(results)} tasks failed" if has_failures else None, ) formatted_response = formatter.format(response_data, debug=debug_enabled) return [TextContent(type="text", text=formatted_response)]
- src/cli_agent_mcp/server.py:114-129 (registration)Tool registration in list_tools(): Dynamically registers 'claude_parallel' when cli_type='claude' since 'claude' in PARALLEL_SUPPORTED_TOOLS.if cli_type in PARALLEL_SUPPORTED_TOOLS: parallel_name = f"{cli_type}_parallel" parallel_desc = ( f"Run multiple {cli_type} tasks in parallel. " f"All tasks share workspace/permission/save_file. " f"Results are appended to save_file with XML wrappers " f"(<agent-output agent=... continuation_id=... task_note=... task_index=... status=...>). " f"Max 100 tasks. Model can be array: single element shared by all, or one per task." ) tools.append( Tool( name=parallel_name, description=parallel_desc, inputSchema=create_tool_schema(cli_type, is_parallel=True), ) )
- src/cli_agent_mcp/server.py:215-221 (registration)Handler instantiation in call_tool(): Creates ParallelHandler('claude') for 'claude_parallel' tool calls.if is_parallel: handler = ParallelHandler(base_name) return await handler.handle(arguments, tool_ctx) # CLI 工具(codex, gemini, claude, opencode) handler = CLIHandler(base_name) return await handler.handle(arguments, tool_ctx)
- create_tool_schema('claude', is_parallel=True) generates the input schema for claude_parallel, including parallel-specific params and claude-specific params like system_prompt.def create_tool_schema(cli_type: str, is_parallel: bool = False) -> dict[str, Any]: """创建工具的 JSON Schema。 参数顺序: 1. prompt, workspace (必填) - parallel 模式下 prompt 被忽略 2. continuation_id, permission, model, save_file (常用) 3. 特有参数 (image / system_prompt / append_system_prompt / file / agent / images) 4. parallel 参数 (仅 parallel 模式) 5. task_note, debug (末尾) """ # Banana 工具使用简化的 schema(不支持 parallel) if cli_type == "banana": properties: dict[str, Any] = { "prompt": { "type": "string", "description": ( "Image generation prompt. Structure: " "<goal>what you want to generate (can be a statement)</goal> " "<context>detailed background info - the more the better</context> " "<hope>desired visual outcome, can be abstract</hope>. " "Example: <goal>Generate 6 weather icons for a mobile app</goal> " "<context>Target users are young professionals, app has a friendly casual vibe, needs to match existing UI with rounded corners</context> " "<hope>pastel colors, consistent 3px stroke, 64x64 base size</hope>" ), }, } properties.update(BANANA_PROPERTIES) properties["task_note"] = { "type": "string", "description": "Subdirectory name for saving images (English recommended, e.g., 'hero-banner', 'product-shot'). Also shown in GUI.", } return { "type": "object", "properties": properties, "required": ["prompt", "save_path", "task_note"], } # Image 工具使用简化的 schema(不支持 parallel) if cli_type == "image": properties: dict[str, Any] = { "prompt": { "type": "string", "description": ( "Image generation prompt. Structure: " "<goal>what you want to generate (can be a statement)</goal> " "<context>detailed background info - the more the better</context> " "<hope>desired visual outcome, can be abstract</hope>. " "Example: <goal>Create a 4-panel comic about debugging</goal> " "<context>Developer finds a bug at 3am, tries multiple fixes, finally discovers it was a typo, comedic relief for tech blog</context> " "<hope>simple black-white line art, speech bubbles, exaggerated tired expressions</hope>" ), }, } properties.update(IMAGE_PROPERTIES) properties["task_note"] = { "type": "string", "description": "Subdirectory name for saving images (English recommended, e.g., 'hero-banner', 'product-shot'). Also shown in GUI.", } return { "type": "object", "properties": properties, "required": ["prompt", "save_path", "task_note"], } # 按顺序构建 properties properties = {} # 1. 公共参数(必填 + 常用) # parallel 模式下排除 prompt, continuation_id, save_file_with_append_mode, save_file_with_wrapper, model if is_parallel: for key, value in COMMON_PROPERTIES.items(): if key in ("prompt", "continuation_id", "save_file_with_append_mode", "save_file_with_wrapper", "model"): continue properties[key] = value # parallel 模式下 model 改为数组类型 properties["model"] = { "type": "array", "items": {"type": "string"}, "default": [], "description": ( "Model override(s). If single element, all tasks use that model. " "If multiple elements, must match parallel_prompts length - each task uses corresponding model. " "Empty array uses CLI default." ), } else: properties.update(COMMON_PROPERTIES) # 2. 特有参数 if cli_type == "codex": properties.update(CODEX_PROPERTIES) elif cli_type == "claude": properties.update(CLAUDE_PROPERTIES) elif cli_type == "opencode": properties.update(OPENCODE_PROPERTIES) # 3. Parallel 参数(仅 parallel 模式) if is_parallel: properties.update(PARALLEL_PROPERTIES) # 4. 末尾参数(parallel 模式下排除 task_note) if is_parallel: properties["debug"] = TAIL_PROPERTIES["debug"] else: properties.update(TAIL_PROPERTIES) # 构建 required 字段 if is_parallel: required = ["workspace", "save_file", "parallel_prompts", "parallel_task_notes"] else: required = ["prompt", "workspace"] return { "type": "object", "properties": properties, "required": required, }
- PARALLEL_SUPPORTED_TOOLS list includes 'claude', enabling parallel variant.# 支持 parallel 的 CLI 工具 PARALLEL_SUPPORTED_TOOLS = ["codex", "gemini", "claude", "opencode"]