Skip to main content
Glama
by apetta

batch_execute

Execute multiple mathematical operations in one request with automatic dependency chaining for multi-step calculations like financial models and statistical workflows.

Instructions

Execute multiple math operations in a single request with automatic dependency chaining.

USE THIS TOOL when you need 2+ calculations where outputs feed into inputs (bond pricing, statistical workflows, multi-step formulas). Don't make sequential individual tool calls.

Benefits: 90-95% token reduction, single API call, highly flexible workflows

Quick Start

Available tools (20): • Basic: calculate, percentage, round, convert_units • Arrays: array_operations, array_statistics, array_aggregate, array_transform • Statistics: statistics, pivot_table, correlation • Financial: financial_calcs, compound_interest, perpetuity • Linear Algebra: matrix_operations, solve_linear_system, matrix_decomposition • Calculus: derivative, integral, limits_series

Result referencing:

Pass $op_id.result directly in any parameter:

  • $op_id.result - Use output from prior operation

  • $op_id.result[0] - Array indexing

  • $op_id.metadata.field - Nested fields

Example: "payment": "$coupon.result" or "variables": {"x": "$op1.result"}

Example - Bond valuation:

{ "operations": [ {"id": "coupon", "tool": "calculate", "context": "Calculate annual coupon payment", "arguments": {"expression": "principal * 0.04", "variables": {"principal": 8306623.86}}}, {"id": "fv", "tool": "financial_calcs", "context": "Future value of coupon payments", "arguments": {"calculation": "fv", "rate": 0.04, "periods": 10, "payment": "$coupon.result", "present_value": 0}}, {"id": "total", "tool": "calculate", "context": "Total bond maturity value", "arguments": {"expression": "fv + principal", "variables": {"fv": "$fv.result", "principal": 8306623.86}}} ], "execution_mode": "auto", "output_mode": "minimal", "context": "Bond A 10-year valuation" }

When to Use

✅ Multi-step calculations (financial models, statistics, transformations) ✅ Data pipelines where step N needs output from step N-1 ✅ Any workflow requiring 2+ operations from the tools above

❌ Single standalone calculation ❌ Need to inspect/validate intermediate results before proceeding

Execution Modes

  • auto (recommended): DAG-based optimization, parallel where possible

  • sequential: Strict order

  • parallel: All concurrent (only if truly independent)

Output Modes

  • full: Complete metadata (default)

  • compact: Remove nulls/whitespace

  • minimal: Basic operation objects with values

  • value: Flat {id: value} map (~90% smaller) - use this for most cases

  • final: Sequential chains only, returns terminal result (~95% smaller)

Structure

Each operation:

  • tool: Tool name (required)

  • arguments: Tool parameters (required)

  • id: Unique identifier (auto-generated if omitted)

  • context: Optional label for this operation

Batch-level context parameter labels entire workflow across all output modes.

Response includes: per-operation status, result/error, execution_time_ms, dependency wave, summary stats.

Input Schema

NameRequiredDescriptionDefault
contextNoOptional annotation to label this calculation (e.g., 'Bond A PV', 'Q2 revenue'). Appears in results for easy identification.
output_modeNoOutput format: full (default), compact, minimal, value, or final. See batch_execute tool for details.full
operationsYesList of operations to execute. Each operation MUST include: tool (name), arguments (dict). Optional: id (UUID/string), context, label, timeout_ms (int)
execution_modeNoExecution strategy: sequential (order), parallel (concurrent), auto (DAG-based)auto
max_concurrentNoMaximum concurrent operations (applies to parallel/auto modes)
stop_on_errorNoWhether to stop execution on first error. If False, independent operations continue even if others fail.

Input Schema (JSON Schema)

{ "$defs": { "BatchOperation": { "description": "Single operation within a batch request.\n\nRepresents one tool call with its arguments, dependencies, and metadata.\nOperations are executed according to their dependencies and the selected execution mode.", "properties": { "arguments": { "additionalProperties": true, "description": "Tool arguments as key-value pairs matching the tool's parameter signature", "type": "object" }, "context": { "anyOf": [ { "maxLength": 1000, "type": "string" }, { "type": "null" } ], "default": null, "description": "Operation-specific context annotation (e.g., 'Bond A valuation')" }, "id": { "description": "Unique operation identifier (auto-generated UUID if not provided)", "maxLength": 200, "minLength": 1, "type": "string" }, "label": { "anyOf": [ { "maxLength": 200, "type": "string" }, { "type": "null" } ], "default": null, "description": "Human-readable label for this operation (displayed in results)" }, "timeout_ms": { "anyOf": [ { "maximum": 300000, "minimum": 100, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Operation-specific timeout in milliseconds (100ms - 300s)" }, "tool": { "description": "Tool name (must match one of the 19 available mathematical tools)", "maxLength": 100, "minLength": 1, "type": "string" } }, "required": [ "tool", "arguments" ], "type": "object" } }, "properties": { "context": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Optional annotation to label this calculation (e.g., 'Bond A PV', 'Q2 revenue'). Appears in results for easy identification." }, "execution_mode": { "default": "auto", "description": "Execution strategy: sequential (order), parallel (concurrent), auto (DAG-based)", "enum": [ "sequential", "parallel", "auto" ], "type": "string" }, "max_concurrent": { "default": 5, "description": "Maximum concurrent operations (applies to parallel/auto modes)", "maximum": 20, "minimum": 1, "type": "integer" }, "operations": { "description": "List of operations to execute. Each operation MUST include: tool (name), arguments (dict). Optional: id (UUID/string), context, label, timeout_ms (int)", "items": { "$ref": "#/$defs/BatchOperation" }, "maxItems": 100, "minItems": 1, "type": "array" }, "output_mode": { "default": "full", "description": "Output format: full (default), compact, minimal, value, or final. See batch_execute tool for details.", "enum": [ "full", "compact", "minimal", "value", "final" ], "type": "string" }, "stop_on_error": { "default": false, "description": "Whether to stop execution on first error. If False, independent operations continue even if others fail.", "type": "boolean" } }, "required": [ "operations" ], "type": "object" }

Implementation Reference

  • The core handler function for the 'batch_execute' tool. It validates inputs, builds a tool registry, creates a BatchExecutor, executes the batch, and returns a JSON response with results and summary.
    async def batch_execute( operations: Annotated[ List[BatchOperation], Field( description=( "List of operations to execute. Each operation MUST include: " "tool (name), arguments (dict). Optional: id (UUID/string), context, label, " "timeout_ms (int)" ), min_length=1, max_length=100, ), ], execution_mode: Annotated[ Literal["sequential", "parallel", "auto"], Field( description="Execution strategy: sequential (order), parallel (concurrent), auto (DAG-based)" ), ] = "auto", max_concurrent: Annotated[ int, Field( description="Maximum concurrent operations (applies to parallel/auto modes)", ge=1, le=20, ), ] = 5, stop_on_error: Annotated[ bool, Field( description=( "Whether to stop execution on first error. " "If False, independent operations continue even if others fail." ) ), ] = False, ) -> str: """Execute batch of mathematical operations with dependency management. This tool orchestrates multiple tool calls in a single request, automatically detecting dependencies and executing operations in optimal parallel waves. Each operation is tracked by its unique ID, providing crystal-clear mapping between inputs and outputs for easy LLM consumption and debugging. Returns: JSON string with results array and execution summary """ try: # Build tool registry from wrapped tools (supports context/output_mode) tool_registry = await _build_tool_registry_async() # Validate tool names for op in operations: if op.tool not in tool_registry: available = ", ".join(sorted(tool_registry.keys())) raise ValueError( f"Unknown tool '{op.tool}' in operation '{op.id}'. Available tools: {available}" ) # Create executor executor = BatchExecutor( operations=operations, tool_registry=tool_registry, mode=execution_mode, max_concurrent=max_concurrent, stop_on_error=stop_on_error, ) # Execute batch response: BatchResponse = await executor.execute() # Convert to JSON # Note: CustomMCP will inject batch-level context at top level return json.dumps( { "results": [result.model_dump() for result in response.results], "summary": response.summary.model_dump(), }, indent=2, default=str, ) except Exception as e: # Return structured error response return json.dumps( { "error": { "type": type(e).__name__, "message": str(e), "tool": "batch_execute", }, "results": [], # No partial results on batch-level error }, indent=2, )
  • The @mcp.tool decorator that registers the 'batch_execute' tool with the MCP server, including detailed description, input schema annotations, and metadata.
    @mcp.tool( name="batch_execute", description=f"""Execute multiple math operations in a single request with automatic dependency chaining. **USE THIS TOOL when you need 2+ calculations where outputs feed into inputs** (bond pricing, statistical workflows, multi-step formulas). Don't make sequential individual tool calls. Benefits: 90-95% token reduction, single API call, highly flexible workflows ## Quick Start {_generate_tool_reference()} **Result referencing:** Pass `$op_id.result` directly in any parameter: - `$op_id.result` - Use output from prior operation - `$op_id.result[0]` - Array indexing - `$op_id.metadata.field` - Nested fields Example: `"payment": "$coupon.result"` or `"variables": {{"x": "$op1.result"}}` **Example - Bond valuation:** ```json {{ "operations": [ {{"id": "coupon", "tool": "calculate", "context": "Calculate annual coupon payment", "arguments": {{"expression": "principal * 0.04", "variables": {{"principal": 8306623.86}}}}}}, {{"id": "fv", "tool": "financial_calcs", "context": "Future value of coupon payments", "arguments": {{"calculation": "fv", "rate": 0.04, "periods": 10, "payment": "$coupon.result", "present_value": 0}}}}, {{"id": "total", "tool": "calculate", "context": "Total bond maturity value", "arguments": {{"expression": "fv + principal", "variables": {{"fv": "$fv.result", "principal": 8306623.86}}}}}} ], "execution_mode": "auto", "output_mode": "minimal", "context": "Bond A 10-year valuation" }} ``` ## When to Use ✅ Multi-step calculations (financial models, statistics, transformations) ✅ Data pipelines where step N needs output from step N-1 ✅ Any workflow requiring 2+ operations from the tools above ❌ Single standalone calculation ❌ Need to inspect/validate intermediate results before proceeding ## Execution Modes - `auto` (recommended): DAG-based optimization, parallel where possible - `sequential`: Strict order - `parallel`: All concurrent (only if truly independent) ## Output Modes - `full`: Complete metadata (default) - `compact`: Remove nulls/whitespace - `minimal`: Basic operation objects with values - `value`: Flat {{id: value}} map (~90% smaller) - **use this for most cases** - `final`: Sequential chains only, returns terminal result (~95% smaller) ## Structure Each operation: - `tool`: Tool name (required) - `arguments`: Tool parameters (required) - `id`: Unique identifier (auto-generated if omitted) - `context`: Optional label for this operation Batch-level `context` parameter labels entire workflow across all output modes. Response includes: per-operation status, result/error, execution_time_ms, dependency wave, summary stats. """, annotations=ToolAnnotations( title="Batch Execute", readOnlyHint=True, ), )
  • Pydantic model defining the schema for a single BatchOperation, used in the operations list input to batch_execute.
    class BatchOperation(BaseModel): """Single operation within a batch request. Represents one tool call with its arguments, dependencies, and metadata. Operations are executed according to their dependencies and the selected execution mode. """ id: str = Field( default_factory=lambda: str(uuid4()), description="Unique operation identifier (auto-generated UUID if not provided)", min_length=1, max_length=200, ) tool: str = Field( description="Tool name (must match one of the 19 available mathematical tools)", min_length=1, max_length=100, ) arguments: Dict[str, Any] = Field( description="Tool arguments as key-value pairs matching the tool's parameter signature" ) context: Optional[str] = Field( default=None, description="Operation-specific context annotation (e.g., 'Bond A valuation')", max_length=1000, ) label: Optional[str] = Field( default=None, description="Human-readable label for this operation (displayed in results)", max_length=200, ) timeout_ms: Optional[int] = Field( default=None, description="Operation-specific timeout in milliseconds (100ms - 300s)", ge=100, le=300000, ) @field_validator('tool') @classmethod def validate_tool_name(cls, v: str) -> str: """Validate tool exists in registry. This will be checked at runtime when the tool registry is available. Static validation happens in the batch_execute tool itself. """ return v @field_validator('id') @classmethod def validate_id_format(cls, v: str) -> str: """Validate operation ID format (no special chars that break references).""" import re if not re.match(r'^[a-zA-Z0-9_-]+$', v): raise ValueError( f"Operation ID '{v}' contains invalid characters. " "Only letters, numbers, underscores, and hyphens are allowed." ) return v
  • Pydantic models for BatchResponse, OperationResult, and BatchSummary defining the output schema of batch_execute.
    class BatchResponse(BaseModel): """Complete batch execution response. Note: The 'context' field (batch-level) is injected by CustomMCP's transformation layer and appears at the top level of the JSON response. """ results: List[OperationResult] = Field( description="Results for each operation in execution order" ) summary: BatchSummary = Field(description="Batch execution summary statistics")
  • The BatchExecutor class implementing the core logic for executing batches: dependency graph construction (DAG), topological sorting, parallel wave execution, result resolution, and error handling.
    class BatchExecutor: """Execute batch operations with intelligent dependency management. Supports three execution modes: - sequential: Operations execute in order specified - parallel: All operations execute concurrently (ignoring dependencies) - auto: Build DAG from dependencies and execute in optimal wave-based manner Uses Python's graphlib.TopologicalSorter for dependency resolution and asyncio for parallel execution within each wave. """ def __init__( self, operations: List[BatchOperation], tool_registry: Dict[str, Any], # Tool functions (async callables) mode: Literal["sequential", "parallel", "auto"] = "auto", max_concurrent: int = 5, stop_on_error: bool = False, ): """Initialise batch executor. Args: operations: List of operations to execute tool_registry: Map of tool_name -> async function mode: Execution mode (sequential, parallel, auto) max_concurrent: Maximum concurrent operations stop_on_error: Whether to halt on first error """ self.operations = {op.id: op for op in operations} self.tool_registry = tool_registry self.mode: Literal["sequential", "parallel", "auto"] = mode self.max_concurrent = max_concurrent self.stop_on_error = stop_on_error # Results storage self.results: Dict[str, Dict[str, Any]] = {} # For dependency resolution self.operation_results: List[OperationResult] = [] # Final results self.errors: Dict[str, Exception] = {} # Timing self.start_time: float = 0 self.num_waves: int = 0 async def execute(self) -> BatchResponse: """Execute all operations and return complete batch response. Returns: BatchResponse with results and summary """ self.start_time = time.time() # Execute based on mode if self.mode == "sequential": await self._execute_sequential() elif self.mode == "parallel": await self._execute_parallel() else: # auto await self._execute_auto() # Build response return self._build_response() async def _execute_sequential(self) -> None: """Execute operations in exact order specified (index order).""" # Sort by creation order (Python 3.7+ dict maintains insertion order) op_ids = list(self.operations.keys()) for wave_num, op_id in enumerate(op_ids): op = self.operations[op_id] result = await self._execute_operation(op, wave=wave_num) self.operation_results.append(result) if result.status == "error" and self.stop_on_error: break self.num_waves = len(self.operation_results) async def _execute_parallel(self) -> None: """Execute all operations in parallel (ignore dependencies).""" semaphore = asyncio.Semaphore(self.max_concurrent) async def bounded_execute(op: BatchOperation) -> OperationResult: async with semaphore: return await self._execute_operation(op, wave=0) # Create tasks for all operations tasks = [bounded_execute(op) for op in self.operations.values()] # Execute all in parallel results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for result in results: if isinstance(result, Exception): # Shouldn't happen as _execute_operation catches exceptions # but handle just in case continue if isinstance(result, OperationResult): self.operation_results.append(result) self.num_waves = 1 async def _execute_auto(self) -> None: """Execute with dependency-aware parallelization using DAG. Uses TopologicalSorter to identify execution waves. Operations within a wave execute in parallel. """ # Build dependency graph try: sorter = self._build_dependency_graph() sorter.prepare() except CycleError as e: # Extract cycle information cycle = e.args[1] if len(e.args) > 1 else [] raise ValueError( f"Circular dependency detected in operations: {' -> '.join(cycle)}. " "Operations cannot depend on themselves directly or indirectly." ) wave_num = 0 semaphore = asyncio.Semaphore(self.max_concurrent) # Execute wave by wave while sorter.is_active(): ready_ids = list(sorter.get_ready()) if not ready_ids: # This shouldn't happen with a valid DAG, but be defensive break # Execute current wave in parallel async def bounded_execute(op_id: str) -> OperationResult: async with semaphore: op = self.operations[op_id] return await self._execute_operation(op, wave=wave_num) tasks = [bounded_execute(op_id) for op_id in ready_ids] wave_results = await asyncio.gather(*tasks, return_exceptions=True) # Process wave results should_stop = False for op_id, result in zip(ready_ids, wave_results): if isinstance(result, Exception): # Unexpected exception (shouldn't happen) self.errors[op_id] = result if self.stop_on_error: should_stop = True elif isinstance(result, OperationResult): self.operation_results.append(result) if result.status == "error" and self.stop_on_error: should_stop = True # Mark operation as done for topological sorter sorter.done(op_id) if should_stop: break wave_num += 1 self.num_waves = wave_num if self.operation_results else 0 def _build_dependency_graph(self) -> TopologicalSorter: """Build DAG from operation dependencies. Returns: TopologicalSorter configured with operation dependencies Raises: ValueError: If dependencies reference non-existent operations """ graph: Dict[str, List[str]] = {} for op_id, op in self.operations.items(): # Scan arguments for $refs to detect dependencies deps = self._extract_refs_from_value(op.arguments) # Convert set to list for TopologicalSorter graph[op_id] = list(deps) # Validate all dependencies exist all_op_ids = set(self.operations.keys()) for op_id, dep_list in graph.items(): invalid_deps = set(dep_list) - all_op_ids if invalid_deps: raise ValueError( f"Operation '{op_id}' has dependencies on non-existent operations: " f"{', '.join(sorted(invalid_deps))}. " f"Available operation IDs: {', '.join(sorted(all_op_ids))}" ) return TopologicalSorter(graph) def _extract_refs_from_value(self, value: Any) -> Set[str]: """Recursively extract $operation_id references from any value.""" refs: Set[str] = set() if isinstance(value, str) and value.startswith('$'): # Extract operation ID from $op_id or $op_id.path op_id = value.split('.')[0][1:] # Remove $ and take first part refs.add(op_id) elif isinstance(value, dict): for v in value.values(): refs.update(self._extract_refs_from_value(v)) elif isinstance(value, list): for item in value: refs.update(self._extract_refs_from_value(item)) return refs async def _execute_operation( self, op: BatchOperation, wave: int ) -> OperationResult: """Execute a single operation with timing and error handling. Args: op: Operation to execute wave: Execution wave number (for metadata) Returns: OperationResult with status, result/error, and metadata """ start_time = time.time() try: # Resolve arguments with dependencies resolved_args = self._resolve_arguments(op) # Get wrapped tool instance (not raw function) if op.tool not in self.tool_registry: raise ValueError( f"Tool '{op.tool}' not found in registry. " f"Available tools: {', '.join(sorted(self.tool_registry.keys()))}" ) tool = self.tool_registry[op.tool] # Execute tool.run() with arguments dict if op.timeout_ms: tool_result = await asyncio.wait_for( tool.run(resolved_args), timeout=op.timeout_ms / 1000 ) else: tool_result = await tool.run(resolved_args) # Extract text content from ToolResult from mcp.types import TextContent if tool_result.content and isinstance(tool_result.content[0], TextContent): raw_result = tool_result.content[0].text else: raise ValueError( f"Unexpected tool result format from {op.tool}. " f"Expected TextContent, got {type(tool_result.content[0]) if tool_result.content else 'no content'}" ) # Parse JSON result result_data = json.loads(raw_result) # Inject operation-level context if provided if op.context: result_data['context'] = op.context # Store result for dependency resolution self.results[op.id] = result_data # Calculate execution time execution_time = (time.time() - start_time) * 1000 return OperationResult( id=op.id, tool=op.tool, status="success", result=result_data, execution_time_ms=execution_time, wave=wave, dependencies=list(self._extract_refs_from_value(op.arguments)), label=op.label, ) except asyncio.TimeoutError: execution_time = (time.time() - start_time) * 1000 return OperationResult( id=op.id, tool=op.tool, status="timeout", error={ "type": "TimeoutError", "message": f"Operation exceeded {op.timeout_ms}ms timeout", "tool": op.tool, }, execution_time_ms=execution_time, wave=wave, dependencies=list(self._extract_refs_from_value(op.arguments)), label=op.label, ) except Exception as e: execution_time = (time.time() - start_time) * 1000 self.errors[op.id] = e return OperationResult( id=op.id, tool=op.tool, status="error", error={ "type": type(e).__name__, "message": str(e), "tool": op.tool, }, execution_time_ms=execution_time, wave=wave, dependencies=list(self._extract_refs_from_value(op.arguments)), label=op.label, ) def _resolve_arguments(self, op: BatchOperation) -> Dict[str, Any]: """Resolve operation arguments with result references. Resolves $refs in arguments. Handles precedence for context and output_mode parameters. Args: op: Operation to resolve arguments for Returns: Fully resolved arguments dictionary Raises: ValueError: If references cannot be resolved """ resolver = ResultResolver(self.results) # Start with base arguments resolved = op.arguments.copy() # Handle context precedence: operation-level > arguments-level # If operation has context at operation level, remove from arguments # (operation-level takes precedence and will be injected after execution) if op.context and 'context' in resolved: del resolved['context'] # Always remove output_mode from arguments to prevent double transformation # The batch-level output_mode controls the entire response format if 'output_mode' in resolved: del resolved['output_mode'] # Resolve all $refs in argument values resolved = resolver.resolve(resolved) return resolved def _build_response(self) -> BatchResponse: """Build complete batch response with results and summary. Returns: BatchResponse with all results and execution statistics """ total_time = (time.time() - self.start_time) * 1000 # Count successes and failures succeeded = sum(1 for r in self.operation_results if r.status == "success") failed = sum(1 for r in self.operation_results if r.status in ["error", "timeout"]) summary = BatchSummary( total_operations=len(self.operations), succeeded=succeeded, failed=failed, total_execution_time_ms=total_time, execution_mode=self.mode, num_waves=self.num_waves, max_concurrent=self.max_concurrent, ) return BatchResponse(results=self.operation_results, summary=summary)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apetta/vibe-math-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server