Skip to main content
Glama

execute_on_group

Execute shell commands simultaneously across multiple servers in a defined group to manage distributed systems efficiently.

Instructions

Execute a shell command on all servers in a group (parallel execution).

Args: group: Group name (e.g. 'dicentra-prod', 'infra'). Use list_groups to see available groups. command: Shell command to execute on all servers in the group. timeout: Per-server command timeout in seconds. Default 30. working_dir: Remote directory to execute from on each server. fail_fast: If true, stop on first failure. Default false (run all).

Returns: Formatted summary of results from all servers in the group.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
groupYes
commandYes
timeoutNo
working_dirNo
fail_fastNo

Implementation Reference

  • Core handler implementation in SSHManager class. Gets servers from a group via registry, executes commands in parallel with a semaphore (max 10 concurrent), handles fail_fast mode to cancel remaining tasks on first failure, and returns a list of ExecResult objects.
    async def execute_on_group(
        self,
        group_name: str,
        command: str,
        timeout: int = 30,
        working_dir: str | None = None,
        fail_fast: bool = False,
        force: bool = False,
    ) -> list[ExecResult]:
        """Execute command on all servers in a group in parallel.
    
        Args:
            group_name: Group name from registry
            command: Command to execute
            timeout: Command timeout in seconds
            working_dir: Working directory for command execution
            fail_fast: Cancel remaining tasks on first failure
            force: Bypass dangerous command detection (use with caution)
    
        Returns:
            List of ExecResult, one per server in the group
        """
        try:
            servers = self.registry.servers_in_group(group_name)
    
            if not servers:
                logger.warning(f"Group '{group_name}' has no servers")
                return []
    
            # Limit concurrent connections
            semaphore = asyncio.Semaphore(10)
    
            async def execute_with_semaphore(server: ServerConfig) -> ExecResult:
                async with semaphore:
                    return await self.execute(
                        server.name, command, timeout, working_dir, force
                    )
    
            # Execute in parallel
            tasks = [execute_with_semaphore(server) for server in servers]
    
            if fail_fast:
                # Cancel remaining tasks on first failure
                actual_tasks = [asyncio.create_task(coro) for coro in tasks]
                results = []
                for future in asyncio.as_completed(actual_tasks):
                    result = await future
                    results.append(result)
                    if result.error or (
                        result.exit_code is not None and result.exit_code != 0
                    ):
                        for task in actual_tasks:
                            if not task.done():
                                task.cancel()
                        break
                return results
            else:
                # Wait for all tasks to complete
                results = await asyncio.gather(*tasks, return_exceptions=True)
    
                # Convert exceptions to ExecResult
                normalized_results = []
                for i, result in enumerate(results):
                    if isinstance(result, Exception):
                        server_name = servers[i].name
                        normalized_results.append(
                            ExecResult(
                                server=server_name,
                                command=command,
                                stdout="",
                                stderr="",
                                exit_code=None,
                                error=f"Exception during execution: {result}",
                            )
                        )
                    else:
                        normalized_results.append(result)
    
                return normalized_results
    
        except KeyError as e:
            logger.error(f"Group not found: {group_name}")
            return [
                ExecResult(
                    server=group_name,
                    command=command,
                    stdout="",
                    stderr="",
                    exit_code=None,
                    error=f"Group not found: {e}",
                )
            ]
    
        except Exception as e:
            logger.error(f"Unexpected error in group execution: {e}")
            return [
                ExecResult(
                    server=group_name,
                    command=command,
                    stdout="",
                    stderr="",
                    exit_code=None,
                    error=f"Unexpected error: {e}",
                )
            ]
  • MCP tool registration with @mcp.tool() decorator. Exposes execute_on_group to MCP clients with parameter definitions (group, command, timeout, working_dir, fail_fast) and calls the SSHManager handler, then formats results.
    @mcp.tool()
    async def execute_on_group(
        group: str,
        command: str,
        timeout: int = 30,
        working_dir: str | None = None,
        fail_fast: bool = False,
    ) -> str:
        """Execute a shell command on all servers in a group (parallel execution).
    
        Args:
            group: Group name (e.g. 'dicentra-prod', 'infra'). Use list_groups to see
                   available groups.
            command: Shell command to execute on all servers in the group.
            timeout: Per-server command timeout in seconds. Default 30.
            working_dir: Remote directory to execute from on each server.
            fail_fast: If true, stop on first failure. Default false (run all).
    
        Returns:
            Formatted summary of results from all servers in the group.
        """
        try:
            _init()
    
            results = await _ssh.execute_on_group(
                group, command, timeout, working_dir, fail_fast
            )
            return format_group_results(results, group)
    
        except Exception as e:
            logger.error(f"Error executing command on group {group}: {e}")
            return f"Error executing command on group {group}: {e}"
  • ExecResult dataclass defining the result structure with fields: server, command, stdout, stderr, exit_code, error, and duration_ms.
    @dataclass
    class ExecResult:
        """Result from executing a command on a remote server.
    
        Mutable to allow construction during execution.
    
        Attributes:
            server: Server name where command was executed
            command: The command that was executed
            stdout: Standard output captured from command
            stderr: Standard error captured from command
            exit_code: Process exit code (None if execution failed)
            error: Error message if execution failed
            duration_ms: Command execution duration in milliseconds
        """
    
        server: str
        command: str
        stdout: str
        stderr: str
        exit_code: int | None
        error: str | None = None
        duration_ms: int = 0
  • format_group_results helper function that formats multiple ExecResult objects into a human-readable summary with success/failure counts per server.
    def format_group_results(results: list[ExecResult], group_name: str) -> str:
        """Format multiple command execution results from a group.
    
        Args:
            results: List of execution results to format
            group_name: Name of the group that was executed
    
        Returns:
            Formatted text showing all results with a summary
    
        Example:
            >>> results = [
            ...     ExecResult("web1", "uptime", "up 142 days", "", 0, None, 150),
            ...     ExecResult("web2", "uptime", "up 89 days", "", 0, None, 89),
            ... ]
            >>> print(format_group_results(results, "prod"))
            Executing on group 'prod' (2 servers)...
            <BLANKLINE>
            [web1] (exit 0, 150ms)
            up 142 days
            <BLANKLINE>
            [web2] (exit 0, 89ms)
            up 89 days
            <BLANKLINE>
            Summary: 2 succeeded, 0 failed
        """
        if not results:
            return (
                f"Executing on group '{group_name}' (0 servers)...\n\nNo servers in group."
            )
    
        lines = [
            f"Executing on group '{group_name}' ({len(results)} servers)...",
            "",
        ]
    
        # Track success/failure counts
        succeeded = 0
        failed = 0
    
        # Format each result
        for result in results:
            if result.error:
                failed += 1
                lines.append(f"[{result.server}] ERROR: {result.error}")
            else:
                if result.exit_code == 0:
                    succeeded += 1
                else:
                    failed += 1
    
                exit_code = result.exit_code if result.exit_code is not None else "unknown"
                lines.append(
                    f"[{result.server}] (exit {exit_code}, {result.duration_ms}ms)"
                )
    
                # Add stdout if present
                if result.stdout:
                    lines.append(result.stdout)
    
                # Add stderr if present (with label)
                if result.stderr:
                    lines.append(f"STDERR: {result.stderr}")
    
            lines.append("")
    
        # Add summary
        lines.append(f"Summary: {succeeded} succeeded, {failed} failed")
    
        return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/blackaxgit/ssh-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server