BashCommand
Execute bash commands to run CLI operations, manage background processes, and check command status within the mcp-wcgw server environment.
Instructions
Execute a bash command. This is stateful (beware with subsequent calls).
Status of the command and the current working directory will always be returned at the end.
The first or the last line might be
(...truncated)if the output is too long.Always run
pwdif you get any file or directory not found error to make sure you're not lost.Do not run bg commands using "&", instead use this tool.
You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit
In order to check status of previous command, use
status_checkwith empty command argument.Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one.
Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again.
Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish.
Only run long running commands in background. Each background command is run in a new non-reusable shell.
On running a bg command you'll get a bg command id that you should use to get status or interact.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| action_json | Yes | ||
| wait_for_seconds | No | ||
| thread_id | Yes |
Implementation Reference
- Core handler that executes BashCommand by interacting with pexpect.spawn shell process, handles sending commands/text/special keys, status checks, incremental output rendering, timeouts, background commands, etc.def execute_bash( bash_state: BashState, enc: EncoderDecoder[int], bash_arg: BashCommand, max_tokens: Optional[int], # This will be noncoding_max_tokens timeout_s: Optional[float], ) -> tuple[str, float]: try: # Check if the thread_id matches current if bash_arg.thread_id != bash_state.current_thread_id: # Try to load state from the thread_id if not bash_state.load_state_from_thread_id(bash_arg.thread_id): return ( f"Error: No saved bash state found for thread_id `{bash_arg.thread_id}`. Please initialize first with this ID.", 0.0, ) output, cost = _execute_bash(bash_state, enc, bash_arg, max_tokens, timeout_s) # Remove echo if it's a command if isinstance(bash_arg.action_json, Command): command = bash_arg.action_json.command.strip() if output.startswith(command): output = output[len(command) :] finally: bash_state.run_bg_expect_thread() if bash_state.over_screen: thread = threading.Thread( target=cleanup_orphaned_wcgw_screens, args=(bash_state.console,), daemon=True, ) thread.start() return output, cost def assert_single_statement(command: str) -> None: # Check for multiple statements using the bash statement parser if "\n" in command: try: parser = BashStatementParser() statements = parser.parse_string(command) except Exception: # Fall back to simple newline check if something goes wrong raise ValueError( "Command should not contain newline character in middle. Run only one command at a time." ) if len(statements) > 1: raise ValueError( "Error: Command contains multiple statements. Please run only one bash statement at a time." ) def get_bg_running_commandsinfo(bash_state: BashState) -> str: msg = "" running = [] for id_, state in bash_state.background_shells.items(): running.append(f"Command: {state.last_command}, bg_command_id: {id_}") if running: msg = ( "Following background commands are attached:\n" + "\n".join(running) + "\n" ) else: msg = "No command running in background.\n" return msg def _execute_bash( bash_state: BashState, enc: EncoderDecoder[int], bash_arg: BashCommand, max_tokens: Optional[int], # This will be noncoding_max_tokens timeout_s: Optional[float], ) -> tuple[str, float]: try: is_interrupt = False command_data = bash_arg.action_json is_bg = False og_bash_state = bash_state if not isinstance(command_data, Command) and command_data.bg_command_id: if command_data.bg_command_id not in bash_state.background_shells: error = f"No shell found running with command id {command_data.bg_command_id}.\n" if bash_state.background_shells: error += get_bg_running_commandsinfo(bash_state) if bash_state.state == "pending": error += f"On the main thread a command is already running ({bash_state.last_command})" else: error += "On the main thread no command is running." raise Exception(error) bash_state = bash_state.background_shells[command_data.bg_command_id] is_bg = True if isinstance(command_data, Command): if bash_state.bash_command_mode.allowed_commands == "none": return "Error: BashCommand not allowed in current mode", 0.0 bash_state.console.print(f"$ {command_data.command}") command = command_data.command.strip() assert_single_statement(command) if command_data.is_background: bash_state = bash_state.start_new_bg_shell(bash_state.cwd) is_bg = True if bash_state.state == "pending": raise ValueError(WAITING_INPUT_MESSAGE) bash_state.clear_to_run() for i in range(0, len(command), 64): bash_state.send(command[i : i + 64], set_as_command=None) bash_state.send(bash_state.linesep, set_as_command=command) elif isinstance(command_data, StatusCheck): bash_state.console.print("Checking status") if bash_state.state != "pending": error = "No running command to check status of.\n" error += get_bg_running_commandsinfo(bash_state) return error, 0.0 elif isinstance(command_data, SendText): if not command_data.send_text: return "Failure: send_text cannot be empty", 0.0 bash_state.console.print(f"Interact text: {command_data.send_text}") for i in range(0, len(command_data.send_text), 128): bash_state.send( command_data.send_text[i : i + 128], set_as_command=None ) bash_state.send(bash_state.linesep, set_as_command=None) elif isinstance(command_data, SendSpecials): if not command_data.send_specials: return "Failure: send_specials cannot be empty", 0.0 bash_state.console.print( f"Sending special sequence: {command_data.send_specials}" ) for char in command_data.send_specials: if char == "Key-up": bash_state.send("\033[A", set_as_command=None) elif char == "Key-down": bash_state.send("\033[B", set_as_command=None) elif char == "Key-left": bash_state.send("\033[D", set_as_command=None) elif char == "Key-right": bash_state.send("\033[C", set_as_command=None) elif char == "Enter": bash_state.send("\x0d", set_as_command=None) elif char == "Ctrl-c": bash_state.sendintr() is_interrupt = True elif char == "Ctrl-d": bash_state.sendintr() is_interrupt = True elif char == "Ctrl-z": bash_state.send("\x1a", set_as_command=None) else: raise Exception(f"Unknown special character: {char}") elif isinstance(command_data, SendAscii): if not command_data.send_ascii: return "Failure: send_ascii cannot be empty", 0.0 bash_state.console.print( f"Sending ASCII sequence: {command_data.send_ascii}" ) for ascii_char in command_data.send_ascii: bash_state.send(chr(ascii_char), set_as_command=None) if ascii_char == 3: is_interrupt = True else: raise ValueError(f"Unknown command type: {type(command_data)}") except KeyboardInterrupt: bash_state.sendintr() bash_state.expect(bash_state.prompt) return "---\n\nFailure: user interrupted the execution", 0.0 wait = min(timeout_s or CONFIG.timeout, CONFIG.timeout_while_output) index = bash_state.expect([bash_state.prompt, pexpect.TIMEOUT], timeout=wait) if index == 1: text = bash_state.before or "" incremental_text = _incremental_text(text, bash_state.pending_output) second_wait_success = False if is_status_check(bash_arg): # There's some text in BashInteraction mode wait for TIMEOUT_WHILE_OUTPUT remaining = CONFIG.timeout_while_output - wait patience = CONFIG.output_wait_patience if not incremental_text: patience -= 1 itext = incremental_text while remaining > 0 and patience > 0: index = bash_state.expect( [bash_state.prompt, pexpect.TIMEOUT], timeout=wait ) if index == 0: second_wait_success = True break else: _itext = bash_state.before or "" _itext = _incremental_text(_itext, bash_state.pending_output) if _itext != itext: patience = 3 else: patience -= 1 itext = _itext remaining = remaining - wait if not second_wait_success: text = bash_state.before or "" incremental_text = _incremental_text(text, bash_state.pending_output) if not second_wait_success: bash_state.set_pending(text) tokens = enc.encoder(incremental_text) if max_tokens and len(tokens) >= max_tokens: incremental_text = "(...truncated)\n" + enc.decoder( tokens[-(max_tokens - 1) :] ) if is_interrupt: incremental_text = ( incremental_text + """--- ---- Failure interrupting. You may want to try Ctrl-c again or program specific exit interactive commands. """ ) exit_status = get_status(bash_state, is_bg) incremental_text += exit_status if is_bg and bash_state.state == "repl": try: bash_state.cleanup() og_bash_state.background_shells.pop(bash_state.current_thread_id) except Exception as e: bash_state.console.log(f"error while cleaning up {e}") return incremental_text, 0 before = str(bash_state.before) output = _incremental_text(before, bash_state.pending_output) bash_state.set_repl() tokens = enc.encoder(output) if max_tokens and len(tokens) >= max_tokens: output = "(...truncated)\n" + enc.decoder(tokens[-(max_tokens - 1) :]) try: exit_status = get_status(bash_state, is_bg) output += exit_status if is_bg and bash_state.state == "repl": try: bash_state.cleanup() og_bash_state.background_shells.pop(bash_state.current_thread_id) except Exception as e: bash_state.console.log(f"error while cleaning up {e}") except ValueError: bash_state.console.print(output) bash_state.console.print(traceback.format_exc()) bash_state.console.print("Malformed output, restarting shell", style="red") # Malformed output, restart shell bash_state.reset_shell() output = "(exit shell has restarted)" return output, 0
- MCP server tool handler that parses arguments into BashCommand model and calls get_tool_output for execution.@server.call_tool() # type: ignore async def handle_call_tool( name: str, arguments: dict[str, Any] | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: global BASH_STATE if not arguments: raise ValueError("Missing arguments") tool_type = which_tool_name(name) tool_call = parse_tool_by_name(name, arguments) try: assert BASH_STATE output_or_dones, _ = get_tool_output( Context(BASH_STATE, BASH_STATE.console), tool_call, default_enc, 0.0, lambda x, y: ("", 0), 24000, # coding_max_tokens 8000, # noncoding_max_tokens ) except Exception as e: output_or_dones = [f"GOT EXCEPTION while calling tool. Error: {e}"] content: list[types.TextContent | types.ImageContent | types.EmbeddedResource] = [] for output_or_done in output_or_dones: if isinstance(output_or_done, str): if issubclass(tool_type, Initialize): # Prepare the original hardcoded message original_message = """ - Additional important note: as soon as you encounter "The user has chosen to disallow the tool call.", immediately stop doing everything and ask user for the reason. Initialize call done. """ # If custom instructions exist, prepend them to the original message if CUSTOM_INSTRUCTIONS: output_or_done += f"\n{CUSTOM_INSTRUCTIONS}\n{original_message}" else: output_or_done += original_message content.append(types.TextContent(type="text", text=output_or_done)) else: content.append( types.ImageContent( type="image", data=output_or_done.data, mimeType=output_or_done.media_type, ) ) return content
- src/wcgw/types_.py:159-177 (schema)Pydantic models defining the input schema for BashCommand tool, using ActionJsonSchema union types, overrides JSON schema to use BashCommandOverride for LLM compatibility.class BashCommandOverride(BaseModel): action_json: ActionJsonSchema wait_for_seconds: Optional[float] = None thread_id: str class BashCommand(BaseModel): action_json: Command | StatusCheck | SendText | SendSpecials | SendAscii wait_for_seconds: Optional[float] = None thread_id: str def model_post_init(self, __context: Any) -> None: self.thread_id = normalize_thread_id(self.thread_id) return super().model_post_init(__context) @staticmethod def model_json_schema(*args, **kwargs) -> dict[str, Any]: # type: ignore return BashCommandOverride.model_json_schema(*args, **kwargs)
- src/wcgw/client/mcp_server/server.py:84-92 (registration)MCP server registration: lists all tools including BashCommand using pre-defined TOOL_PROMPTS.@server.list_tools() # type: ignore async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return TOOL_PROMPTS
- src/wcgw/client/tool_prompts.py:37-55 (registration)Registers BashCommand tool schema, description, and annotations in TOOL_PROMPTS used by MCP server.Tool( inputSchema=remove_titles_from_schema(BashCommand.model_json_schema()), name="BashCommand", description=""" - Execute a bash command. This is stateful (beware with subsequent calls). - Status of the command and the current working directory will always be returned at the end. - The first or the last line might be `(...truncated)` if the output is too long. - Always run `pwd` if you get any file or directory not found error to make sure you're not lost. - Do not run bg commands using "&", instead use this tool. - You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit - In order to check status of previous command, use `status_check` with empty command argument. - Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one. - Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again. - Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish. - Only run long running commands in background. Each background command is run in a new non-reusable shell. - On running a bg command you'll get a bg command id that you should use to get status or interact. """, annotations=ToolAnnotations(destructiveHint=True, openWorldHint=True), ),