Skip to main content
Glama

BashCommand

Execute bash commands to run CLI operations, manage background processes, and check command status within the mcp-wcgw server environment.

Instructions

  • Execute a bash command. This is stateful (beware with subsequent calls).

  • Status of the command and the current working directory will always be returned at the end.

  • The first or the last line might be (...truncated) if the output is too long.

  • Always run pwd if you get any file or directory not found error to make sure you're not lost.

  • Do not run bg commands using "&", instead use this tool.

  • You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit

  • In order to check status of previous command, use status_check with empty command argument.

  • Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one.

  • Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again.

  • Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish.

  • Only run long running commands in background. Each background command is run in a new non-reusable shell.

  • On running a bg command you'll get a bg command id that you should use to get status or interact.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
action_jsonYes
wait_for_secondsNo
thread_idYes

Implementation Reference

  • Core handler that executes BashCommand by interacting with pexpect.spawn shell process, handles sending commands/text/special keys, status checks, incremental output rendering, timeouts, background commands, etc.
    def execute_bash(
        bash_state: BashState,
        enc: EncoderDecoder[int],
        bash_arg: BashCommand,
        max_tokens: Optional[int],  # This will be noncoding_max_tokens
        timeout_s: Optional[float],
    ) -> tuple[str, float]:
        try:
            # Check if the thread_id matches current
            if bash_arg.thread_id != bash_state.current_thread_id:
                # Try to load state from the thread_id
                if not bash_state.load_state_from_thread_id(bash_arg.thread_id):
                    return (
                        f"Error: No saved bash state found for thread_id `{bash_arg.thread_id}`. Please initialize first with this ID.",
                        0.0,
                    )
    
            output, cost = _execute_bash(bash_state, enc, bash_arg, max_tokens, timeout_s)
    
            # Remove echo if it's a command
            if isinstance(bash_arg.action_json, Command):
                command = bash_arg.action_json.command.strip()
                if output.startswith(command):
                    output = output[len(command) :]
    
        finally:
            bash_state.run_bg_expect_thread()
            if bash_state.over_screen:
                thread = threading.Thread(
                    target=cleanup_orphaned_wcgw_screens,
                    args=(bash_state.console,),
                    daemon=True,
                )
                thread.start()
        return output, cost
    
    
    def assert_single_statement(command: str) -> None:
        # Check for multiple statements using the bash statement parser
        if "\n" in command:
            try:
                parser = BashStatementParser()
                statements = parser.parse_string(command)
            except Exception:
                # Fall back to simple newline check if something goes wrong
                raise ValueError(
                    "Command should not contain newline character in middle. Run only one command at a time."
                )
            if len(statements) > 1:
                raise ValueError(
                    "Error: Command contains multiple statements. Please run only one bash statement at a time."
                )
    
    
    def get_bg_running_commandsinfo(bash_state: BashState) -> str:
        msg = ""
        running = []
        for id_, state in bash_state.background_shells.items():
            running.append(f"Command: {state.last_command}, bg_command_id: {id_}")
        if running:
            msg = (
                "Following background commands are attached:\n" + "\n".join(running) + "\n"
            )
        else:
            msg = "No command running in background.\n"
        return msg
    
    
    def _execute_bash(
        bash_state: BashState,
        enc: EncoderDecoder[int],
        bash_arg: BashCommand,
        max_tokens: Optional[int],  # This will be noncoding_max_tokens
        timeout_s: Optional[float],
    ) -> tuple[str, float]:
        try:
            is_interrupt = False
            command_data = bash_arg.action_json
            is_bg = False
            og_bash_state = bash_state
    
            if not isinstance(command_data, Command) and command_data.bg_command_id:
                if command_data.bg_command_id not in bash_state.background_shells:
                    error = f"No shell found running with command id {command_data.bg_command_id}.\n"
                    if bash_state.background_shells:
                        error += get_bg_running_commandsinfo(bash_state)
                    if bash_state.state == "pending":
                        error += f"On the main thread a command is already running ({bash_state.last_command})"
                    else:
                        error += "On the main thread no command is running."
                    raise Exception(error)
                bash_state = bash_state.background_shells[command_data.bg_command_id]
                is_bg = True
    
            if isinstance(command_data, Command):
                if bash_state.bash_command_mode.allowed_commands == "none":
                    return "Error: BashCommand not allowed in current mode", 0.0
    
                bash_state.console.print(f"$ {command_data.command}")
    
                command = command_data.command.strip()
    
                assert_single_statement(command)
    
                if command_data.is_background:
                    bash_state = bash_state.start_new_bg_shell(bash_state.cwd)
                    is_bg = True
    
                if bash_state.state == "pending":
                    raise ValueError(WAITING_INPUT_MESSAGE)
    
                bash_state.clear_to_run()
                for i in range(0, len(command), 64):
                    bash_state.send(command[i : i + 64], set_as_command=None)
                bash_state.send(bash_state.linesep, set_as_command=command)
            elif isinstance(command_data, StatusCheck):
                bash_state.console.print("Checking status")
                if bash_state.state != "pending":
                    error = "No running command to check status of.\n"
                    error += get_bg_running_commandsinfo(bash_state)
                    return error, 0.0
    
            elif isinstance(command_data, SendText):
                if not command_data.send_text:
                    return "Failure: send_text cannot be empty", 0.0
    
                bash_state.console.print(f"Interact text: {command_data.send_text}")
                for i in range(0, len(command_data.send_text), 128):
                    bash_state.send(
                        command_data.send_text[i : i + 128], set_as_command=None
                    )
                bash_state.send(bash_state.linesep, set_as_command=None)
    
            elif isinstance(command_data, SendSpecials):
                if not command_data.send_specials:
                    return "Failure: send_specials cannot be empty", 0.0
    
                bash_state.console.print(
                    f"Sending special sequence: {command_data.send_specials}"
                )
                for char in command_data.send_specials:
                    if char == "Key-up":
                        bash_state.send("\033[A", set_as_command=None)
                    elif char == "Key-down":
                        bash_state.send("\033[B", set_as_command=None)
                    elif char == "Key-left":
                        bash_state.send("\033[D", set_as_command=None)
                    elif char == "Key-right":
                        bash_state.send("\033[C", set_as_command=None)
                    elif char == "Enter":
                        bash_state.send("\x0d", set_as_command=None)
                    elif char == "Ctrl-c":
                        bash_state.sendintr()
                        is_interrupt = True
                    elif char == "Ctrl-d":
                        bash_state.sendintr()
                        is_interrupt = True
                    elif char == "Ctrl-z":
                        bash_state.send("\x1a", set_as_command=None)
                    else:
                        raise Exception(f"Unknown special character: {char}")
    
            elif isinstance(command_data, SendAscii):
                if not command_data.send_ascii:
                    return "Failure: send_ascii cannot be empty", 0.0
    
                bash_state.console.print(
                    f"Sending ASCII sequence: {command_data.send_ascii}"
                )
                for ascii_char in command_data.send_ascii:
                    bash_state.send(chr(ascii_char), set_as_command=None)
                    if ascii_char == 3:
                        is_interrupt = True
            else:
                raise ValueError(f"Unknown command type: {type(command_data)}")
    
        except KeyboardInterrupt:
            bash_state.sendintr()
            bash_state.expect(bash_state.prompt)
            return "---\n\nFailure: user interrupted the execution", 0.0
    
        wait = min(timeout_s or CONFIG.timeout, CONFIG.timeout_while_output)
        index = bash_state.expect([bash_state.prompt, pexpect.TIMEOUT], timeout=wait)
        if index == 1:
            text = bash_state.before or ""
            incremental_text = _incremental_text(text, bash_state.pending_output)
    
            second_wait_success = False
            if is_status_check(bash_arg):
                # There's some text in BashInteraction mode wait for TIMEOUT_WHILE_OUTPUT
                remaining = CONFIG.timeout_while_output - wait
                patience = CONFIG.output_wait_patience
                if not incremental_text:
                    patience -= 1
                itext = incremental_text
                while remaining > 0 and patience > 0:
                    index = bash_state.expect(
                        [bash_state.prompt, pexpect.TIMEOUT], timeout=wait
                    )
                    if index == 0:
                        second_wait_success = True
                        break
                    else:
                        _itext = bash_state.before or ""
                        _itext = _incremental_text(_itext, bash_state.pending_output)
                        if _itext != itext:
                            patience = 3
                        else:
                            patience -= 1
                        itext = _itext
    
                    remaining = remaining - wait
    
                if not second_wait_success:
                    text = bash_state.before or ""
                    incremental_text = _incremental_text(text, bash_state.pending_output)
    
            if not second_wait_success:
                bash_state.set_pending(text)
    
                tokens = enc.encoder(incremental_text)
    
                if max_tokens and len(tokens) >= max_tokens:
                    incremental_text = "(...truncated)\n" + enc.decoder(
                        tokens[-(max_tokens - 1) :]
                    )
    
                if is_interrupt:
                    incremental_text = (
                        incremental_text
                        + """---
    ----
    Failure interrupting.
    You may want to try Ctrl-c again or program specific exit interactive commands.
        """
                    )
    
                exit_status = get_status(bash_state, is_bg)
                incremental_text += exit_status
                if is_bg and bash_state.state == "repl":
                    try:
                        bash_state.cleanup()
                        og_bash_state.background_shells.pop(bash_state.current_thread_id)
                    except Exception as e:
                        bash_state.console.log(f"error while cleaning up {e}")
    
                return incremental_text, 0
    
        before = str(bash_state.before)
    
        output = _incremental_text(before, bash_state.pending_output)
        bash_state.set_repl()
    
        tokens = enc.encoder(output)
        if max_tokens and len(tokens) >= max_tokens:
            output = "(...truncated)\n" + enc.decoder(tokens[-(max_tokens - 1) :])
    
        try:
            exit_status = get_status(bash_state, is_bg)
            output += exit_status
            if is_bg and bash_state.state == "repl":
                try:
                    bash_state.cleanup()
                    og_bash_state.background_shells.pop(bash_state.current_thread_id)
                except Exception as e:
                    bash_state.console.log(f"error while cleaning up {e}")
        except ValueError:
            bash_state.console.print(output)
            bash_state.console.print(traceback.format_exc())
            bash_state.console.print("Malformed output, restarting shell", style="red")
            # Malformed output, restart shell
            bash_state.reset_shell()
            output = "(exit shell has restarted)"
        return output, 0
  • MCP server tool handler that parses arguments into BashCommand model and calls get_tool_output for execution.
    @server.call_tool()  # type: ignore
    async def handle_call_tool(
        name: str, arguments: dict[str, Any] | None
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        global BASH_STATE
        if not arguments:
            raise ValueError("Missing arguments")
    
        tool_type = which_tool_name(name)
        tool_call = parse_tool_by_name(name, arguments)
    
        try:
            assert BASH_STATE
            output_or_dones, _ = get_tool_output(
                Context(BASH_STATE, BASH_STATE.console),
                tool_call,
                default_enc,
                0.0,
                lambda x, y: ("", 0),
                24000,  # coding_max_tokens
                8000,  # noncoding_max_tokens
            )
    
        except Exception as e:
            output_or_dones = [f"GOT EXCEPTION while calling tool. Error: {e}"]
    
        content: list[types.TextContent | types.ImageContent | types.EmbeddedResource] = []
        for output_or_done in output_or_dones:
            if isinstance(output_or_done, str):
                if issubclass(tool_type, Initialize):
                    # Prepare the original hardcoded message
                    original_message = """
    - Additional important note: as soon as you encounter "The user has chosen to disallow the tool call.", immediately stop doing everything and ask user for the reason.
    
    Initialize call done.
        """
    
                    # If custom instructions exist, prepend them to the original message
                    if CUSTOM_INSTRUCTIONS:
                        output_or_done += f"\n{CUSTOM_INSTRUCTIONS}\n{original_message}"
                    else:
                        output_or_done += original_message
    
                content.append(types.TextContent(type="text", text=output_or_done))
            else:
                content.append(
                    types.ImageContent(
                        type="image",
                        data=output_or_done.data,
                        mimeType=output_or_done.media_type,
                    )
                )
    
        return content
  • Pydantic models defining the input schema for BashCommand tool, using ActionJsonSchema union types, overrides JSON schema to use BashCommandOverride for LLM compatibility.
    class BashCommandOverride(BaseModel):
        action_json: ActionJsonSchema
        wait_for_seconds: Optional[float] = None
        thread_id: str
    
    
    class BashCommand(BaseModel):
        action_json: Command | StatusCheck | SendText | SendSpecials | SendAscii
        wait_for_seconds: Optional[float] = None
        thread_id: str
    
        def model_post_init(self, __context: Any) -> None:
            self.thread_id = normalize_thread_id(self.thread_id)
            return super().model_post_init(__context)
    
        @staticmethod
        def model_json_schema(*args, **kwargs) -> dict[str, Any]:  # type: ignore
            return BashCommandOverride.model_json_schema(*args, **kwargs)
  • MCP server registration: lists all tools including BashCommand using pre-defined TOOL_PROMPTS.
    @server.list_tools()  # type: ignore
    async def handle_list_tools() -> list[types.Tool]:
        """
        List available tools.
        Each tool specifies its arguments using JSON Schema validation.
        """
    
        return TOOL_PROMPTS
  • Registers BashCommand tool schema, description, and annotations in TOOL_PROMPTS used by MCP server.
        Tool(
            inputSchema=remove_titles_from_schema(BashCommand.model_json_schema()),
            name="BashCommand",
            description="""
    - Execute a bash command. This is stateful (beware with subsequent calls).
    - Status of the command and the current working directory will always be returned at the end.
    - The first or the last line might be `(...truncated)` if the output is too long.
    - Always run `pwd` if you get any file or directory not found error to make sure you're not lost.
    - Do not run bg commands using "&", instead use this tool.
    - You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit
    - In order to check status of previous command, use `status_check` with empty command argument.
    - Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one.
    - Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again.
    - Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish.
    - Only run long running commands in background. Each background command is run in a new non-reusable shell.
    - On running a bg command you'll get a bg command id that you should use to get status or interact.
    """,
            annotations=ToolAnnotations(destructiveHint=True, openWorldHint=True),
        ),

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rusiaaman/wcgw'

If you have feedback or need assistance with the MCP directory API, please join our Discord server