execute_on_group
Execute shell commands simultaneously across multiple servers in a defined group to manage distributed systems efficiently.
Instructions
Execute a shell command on all servers in a group (parallel execution).
Args: group: Group name (e.g. 'dicentra-prod', 'infra'). Use list_groups to see available groups. command: Shell command to execute on all servers in the group. timeout: Per-server command timeout in seconds. Default 30. working_dir: Remote directory to execute from on each server. fail_fast: If true, stop on first failure. Default false (run all).
Returns: Formatted summary of results from all servers in the group.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| group | Yes | ||
| command | Yes | ||
| timeout | No | ||
| working_dir | No | ||
| fail_fast | No |
Implementation Reference
- src/ssh_mcp/ssh.py:275-379 (handler)Core handler implementation in SSHManager class. Gets servers from a group via registry, executes commands in parallel with a semaphore (max 10 concurrent), handles fail_fast mode to cancel remaining tasks on first failure, and returns a list of ExecResult objects.
async def execute_on_group( self, group_name: str, command: str, timeout: int = 30, working_dir: str | None = None, fail_fast: bool = False, force: bool = False, ) -> list[ExecResult]: """Execute command on all servers in a group in parallel. Args: group_name: Group name from registry command: Command to execute timeout: Command timeout in seconds working_dir: Working directory for command execution fail_fast: Cancel remaining tasks on first failure force: Bypass dangerous command detection (use with caution) Returns: List of ExecResult, one per server in the group """ try: servers = self.registry.servers_in_group(group_name) if not servers: logger.warning(f"Group '{group_name}' has no servers") return [] # Limit concurrent connections semaphore = asyncio.Semaphore(10) async def execute_with_semaphore(server: ServerConfig) -> ExecResult: async with semaphore: return await self.execute( server.name, command, timeout, working_dir, force ) # Execute in parallel tasks = [execute_with_semaphore(server) for server in servers] if fail_fast: # Cancel remaining tasks on first failure actual_tasks = [asyncio.create_task(coro) for coro in tasks] results = [] for future in asyncio.as_completed(actual_tasks): result = await future results.append(result) if result.error or ( result.exit_code is not None and result.exit_code != 0 ): for task in actual_tasks: if not task.done(): task.cancel() break return results else: # Wait for all tasks to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Convert exceptions to ExecResult normalized_results = [] for i, result in enumerate(results): if isinstance(result, Exception): server_name = servers[i].name normalized_results.append( ExecResult( server=server_name, command=command, stdout="", stderr="", exit_code=None, error=f"Exception during execution: {result}", ) ) else: normalized_results.append(result) return normalized_results except KeyError as e: logger.error(f"Group not found: {group_name}") return [ ExecResult( server=group_name, command=command, stdout="", stderr="", exit_code=None, error=f"Group not found: {e}", ) ] except Exception as e: logger.error(f"Unexpected error in group execution: {e}") return [ ExecResult( server=group_name, command=command, stdout="", stderr="", exit_code=None, error=f"Unexpected error: {e}", ) ] - src/ssh_mcp/server.py:212-243 (registration)MCP tool registration with @mcp.tool() decorator. Exposes execute_on_group to MCP clients with parameter definitions (group, command, timeout, working_dir, fail_fast) and calls the SSHManager handler, then formats results.
@mcp.tool() async def execute_on_group( group: str, command: str, timeout: int = 30, working_dir: str | None = None, fail_fast: bool = False, ) -> str: """Execute a shell command on all servers in a group (parallel execution). Args: group: Group name (e.g. 'dicentra-prod', 'infra'). Use list_groups to see available groups. command: Shell command to execute on all servers in the group. timeout: Per-server command timeout in seconds. Default 30. working_dir: Remote directory to execute from on each server. fail_fast: If true, stop on first failure. Default false (run all). Returns: Formatted summary of results from all servers in the group. """ try: _init() results = await _ssh.execute_on_group( group, command, timeout, working_dir, fail_fast ) return format_group_results(results, group) except Exception as e: logger.error(f"Error executing command on group {group}: {e}") return f"Error executing command on group {group}: {e}" - src/ssh_mcp/models.py:78-100 (schema)ExecResult dataclass defining the result structure with fields: server, command, stdout, stderr, exit_code, error, and duration_ms.
@dataclass class ExecResult: """Result from executing a command on a remote server. Mutable to allow construction during execution. Attributes: server: Server name where command was executed command: The command that was executed stdout: Standard output captured from command stderr: Standard error captured from command exit_code: Process exit code (None if execution failed) error: Error message if execution failed duration_ms: Command execution duration in milliseconds """ server: str command: str stdout: str stderr: str exit_code: int | None error: str | None = None duration_ms: int = 0 - src/ssh_mcp/formatting.py:151-220 (helper)format_group_results helper function that formats multiple ExecResult objects into a human-readable summary with success/failure counts per server.
def format_group_results(results: list[ExecResult], group_name: str) -> str: """Format multiple command execution results from a group. Args: results: List of execution results to format group_name: Name of the group that was executed Returns: Formatted text showing all results with a summary Example: >>> results = [ ... ExecResult("web1", "uptime", "up 142 days", "", 0, None, 150), ... ExecResult("web2", "uptime", "up 89 days", "", 0, None, 89), ... ] >>> print(format_group_results(results, "prod")) Executing on group 'prod' (2 servers)... <BLANKLINE> [web1] (exit 0, 150ms) up 142 days <BLANKLINE> [web2] (exit 0, 89ms) up 89 days <BLANKLINE> Summary: 2 succeeded, 0 failed """ if not results: return ( f"Executing on group '{group_name}' (0 servers)...\n\nNo servers in group." ) lines = [ f"Executing on group '{group_name}' ({len(results)} servers)...", "", ] # Track success/failure counts succeeded = 0 failed = 0 # Format each result for result in results: if result.error: failed += 1 lines.append(f"[{result.server}] ERROR: {result.error}") else: if result.exit_code == 0: succeeded += 1 else: failed += 1 exit_code = result.exit_code if result.exit_code is not None else "unknown" lines.append( f"[{result.server}] (exit {exit_code}, {result.duration_ms}ms)" ) # Add stdout if present if result.stdout: lines.append(result.stdout) # Add stderr if present (with label) if result.stderr: lines.append(f"STDERR: {result.stderr}") lines.append("") # Add summary lines.append(f"Summary: {succeeded} succeeded, {failed} failed") return "\n".join(lines)