Skip to main content
Glama

BashCommand

Execute bash commands to run CLI operations, manage background processes, and check command status within the mcp-wcgw server environment.

Instructions

  • Execute a bash command. This is stateful (beware with subsequent calls).

  • Status of the command and the current working directory will always be returned at the end.

  • The first or the last line might be (...truncated) if the output is too long.

  • Always run pwd if you get any file or directory not found error to make sure you're not lost.

  • Do not run bg commands using "&", instead use this tool.

  • You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit

  • In order to check status of previous command, use status_check with empty command argument.

  • Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one.

  • Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again.

  • Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish.

  • Only run long running commands in background. Each background command is run in a new non-reusable shell.

  • On running a bg command you'll get a bg command id that you should use to get status or interact.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
action_jsonYes
wait_for_secondsNo
thread_idYes

Implementation Reference

  • Core handler that executes BashCommand by interacting with pexpect.spawn shell process, handles sending commands/text/special keys, status checks, incremental output rendering, timeouts, background commands, etc.
    def execute_bash( bash_state: BashState, enc: EncoderDecoder[int], bash_arg: BashCommand, max_tokens: Optional[int], # This will be noncoding_max_tokens timeout_s: Optional[float], ) -> tuple[str, float]: try: # Check if the thread_id matches current if bash_arg.thread_id != bash_state.current_thread_id: # Try to load state from the thread_id if not bash_state.load_state_from_thread_id(bash_arg.thread_id): return ( f"Error: No saved bash state found for thread_id `{bash_arg.thread_id}`. Please initialize first with this ID.", 0.0, ) output, cost = _execute_bash(bash_state, enc, bash_arg, max_tokens, timeout_s) # Remove echo if it's a command if isinstance(bash_arg.action_json, Command): command = bash_arg.action_json.command.strip() if output.startswith(command): output = output[len(command) :] finally: bash_state.run_bg_expect_thread() if bash_state.over_screen: thread = threading.Thread( target=cleanup_orphaned_wcgw_screens, args=(bash_state.console,), daemon=True, ) thread.start() return output, cost def assert_single_statement(command: str) -> None: # Check for multiple statements using the bash statement parser if "\n" in command: try: parser = BashStatementParser() statements = parser.parse_string(command) except Exception: # Fall back to simple newline check if something goes wrong raise ValueError( "Command should not contain newline character in middle. Run only one command at a time." ) if len(statements) > 1: raise ValueError( "Error: Command contains multiple statements. Please run only one bash statement at a time." ) def get_bg_running_commandsinfo(bash_state: BashState) -> str: msg = "" running = [] for id_, state in bash_state.background_shells.items(): running.append(f"Command: {state.last_command}, bg_command_id: {id_}") if running: msg = ( "Following background commands are attached:\n" + "\n".join(running) + "\n" ) else: msg = "No command running in background.\n" return msg def _execute_bash( bash_state: BashState, enc: EncoderDecoder[int], bash_arg: BashCommand, max_tokens: Optional[int], # This will be noncoding_max_tokens timeout_s: Optional[float], ) -> tuple[str, float]: try: is_interrupt = False command_data = bash_arg.action_json is_bg = False og_bash_state = bash_state if not isinstance(command_data, Command) and command_data.bg_command_id: if command_data.bg_command_id not in bash_state.background_shells: error = f"No shell found running with command id {command_data.bg_command_id}.\n" if bash_state.background_shells: error += get_bg_running_commandsinfo(bash_state) if bash_state.state == "pending": error += f"On the main thread a command is already running ({bash_state.last_command})" else: error += "On the main thread no command is running." raise Exception(error) bash_state = bash_state.background_shells[command_data.bg_command_id] is_bg = True if isinstance(command_data, Command): if bash_state.bash_command_mode.allowed_commands == "none": return "Error: BashCommand not allowed in current mode", 0.0 bash_state.console.print(f"$ {command_data.command}") command = command_data.command.strip() assert_single_statement(command) if command_data.is_background: bash_state = bash_state.start_new_bg_shell(bash_state.cwd) is_bg = True if bash_state.state == "pending": raise ValueError(WAITING_INPUT_MESSAGE) bash_state.clear_to_run() for i in range(0, len(command), 64): bash_state.send(command[i : i + 64], set_as_command=None) bash_state.send(bash_state.linesep, set_as_command=command) elif isinstance(command_data, StatusCheck): bash_state.console.print("Checking status") if bash_state.state != "pending": error = "No running command to check status of.\n" error += get_bg_running_commandsinfo(bash_state) return error, 0.0 elif isinstance(command_data, SendText): if not command_data.send_text: return "Failure: send_text cannot be empty", 0.0 bash_state.console.print(f"Interact text: {command_data.send_text}") for i in range(0, len(command_data.send_text), 128): bash_state.send( command_data.send_text[i : i + 128], set_as_command=None ) bash_state.send(bash_state.linesep, set_as_command=None) elif isinstance(command_data, SendSpecials): if not command_data.send_specials: return "Failure: send_specials cannot be empty", 0.0 bash_state.console.print( f"Sending special sequence: {command_data.send_specials}" ) for char in command_data.send_specials: if char == "Key-up": bash_state.send("\033[A", set_as_command=None) elif char == "Key-down": bash_state.send("\033[B", set_as_command=None) elif char == "Key-left": bash_state.send("\033[D", set_as_command=None) elif char == "Key-right": bash_state.send("\033[C", set_as_command=None) elif char == "Enter": bash_state.send("\x0d", set_as_command=None) elif char == "Ctrl-c": bash_state.sendintr() is_interrupt = True elif char == "Ctrl-d": bash_state.sendintr() is_interrupt = True elif char == "Ctrl-z": bash_state.send("\x1a", set_as_command=None) else: raise Exception(f"Unknown special character: {char}") elif isinstance(command_data, SendAscii): if not command_data.send_ascii: return "Failure: send_ascii cannot be empty", 0.0 bash_state.console.print( f"Sending ASCII sequence: {command_data.send_ascii}" ) for ascii_char in command_data.send_ascii: bash_state.send(chr(ascii_char), set_as_command=None) if ascii_char == 3: is_interrupt = True else: raise ValueError(f"Unknown command type: {type(command_data)}") except KeyboardInterrupt: bash_state.sendintr() bash_state.expect(bash_state.prompt) return "---\n\nFailure: user interrupted the execution", 0.0 wait = min(timeout_s or CONFIG.timeout, CONFIG.timeout_while_output) index = bash_state.expect([bash_state.prompt, pexpect.TIMEOUT], timeout=wait) if index == 1: text = bash_state.before or "" incremental_text = _incremental_text(text, bash_state.pending_output) second_wait_success = False if is_status_check(bash_arg): # There's some text in BashInteraction mode wait for TIMEOUT_WHILE_OUTPUT remaining = CONFIG.timeout_while_output - wait patience = CONFIG.output_wait_patience if not incremental_text: patience -= 1 itext = incremental_text while remaining > 0 and patience > 0: index = bash_state.expect( [bash_state.prompt, pexpect.TIMEOUT], timeout=wait ) if index == 0: second_wait_success = True break else: _itext = bash_state.before or "" _itext = _incremental_text(_itext, bash_state.pending_output) if _itext != itext: patience = 3 else: patience -= 1 itext = _itext remaining = remaining - wait if not second_wait_success: text = bash_state.before or "" incremental_text = _incremental_text(text, bash_state.pending_output) if not second_wait_success: bash_state.set_pending(text) tokens = enc.encoder(incremental_text) if max_tokens and len(tokens) >= max_tokens: incremental_text = "(...truncated)\n" + enc.decoder( tokens[-(max_tokens - 1) :] ) if is_interrupt: incremental_text = ( incremental_text + """--- ---- Failure interrupting. You may want to try Ctrl-c again or program specific exit interactive commands. """ ) exit_status = get_status(bash_state, is_bg) incremental_text += exit_status if is_bg and bash_state.state == "repl": try: bash_state.cleanup() og_bash_state.background_shells.pop(bash_state.current_thread_id) except Exception as e: bash_state.console.log(f"error while cleaning up {e}") return incremental_text, 0 before = str(bash_state.before) output = _incremental_text(before, bash_state.pending_output) bash_state.set_repl() tokens = enc.encoder(output) if max_tokens and len(tokens) >= max_tokens: output = "(...truncated)\n" + enc.decoder(tokens[-(max_tokens - 1) :]) try: exit_status = get_status(bash_state, is_bg) output += exit_status if is_bg and bash_state.state == "repl": try: bash_state.cleanup() og_bash_state.background_shells.pop(bash_state.current_thread_id) except Exception as e: bash_state.console.log(f"error while cleaning up {e}") except ValueError: bash_state.console.print(output) bash_state.console.print(traceback.format_exc()) bash_state.console.print("Malformed output, restarting shell", style="red") # Malformed output, restart shell bash_state.reset_shell() output = "(exit shell has restarted)" return output, 0
  • MCP server tool handler that parses arguments into BashCommand model and calls get_tool_output for execution.
    @server.call_tool() # type: ignore async def handle_call_tool( name: str, arguments: dict[str, Any] | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: global BASH_STATE if not arguments: raise ValueError("Missing arguments") tool_type = which_tool_name(name) tool_call = parse_tool_by_name(name, arguments) try: assert BASH_STATE output_or_dones, _ = get_tool_output( Context(BASH_STATE, BASH_STATE.console), tool_call, default_enc, 0.0, lambda x, y: ("", 0), 24000, # coding_max_tokens 8000, # noncoding_max_tokens ) except Exception as e: output_or_dones = [f"GOT EXCEPTION while calling tool. Error: {e}"] content: list[types.TextContent | types.ImageContent | types.EmbeddedResource] = [] for output_or_done in output_or_dones: if isinstance(output_or_done, str): if issubclass(tool_type, Initialize): # Prepare the original hardcoded message original_message = """ - Additional important note: as soon as you encounter "The user has chosen to disallow the tool call.", immediately stop doing everything and ask user for the reason. Initialize call done. """ # If custom instructions exist, prepend them to the original message if CUSTOM_INSTRUCTIONS: output_or_done += f"\n{CUSTOM_INSTRUCTIONS}\n{original_message}" else: output_or_done += original_message content.append(types.TextContent(type="text", text=output_or_done)) else: content.append( types.ImageContent( type="image", data=output_or_done.data, mimeType=output_or_done.media_type, ) ) return content
  • Pydantic models defining the input schema for BashCommand tool, using ActionJsonSchema union types, overrides JSON schema to use BashCommandOverride for LLM compatibility.
    class BashCommandOverride(BaseModel): action_json: ActionJsonSchema wait_for_seconds: Optional[float] = None thread_id: str class BashCommand(BaseModel): action_json: Command | StatusCheck | SendText | SendSpecials | SendAscii wait_for_seconds: Optional[float] = None thread_id: str def model_post_init(self, __context: Any) -> None: self.thread_id = normalize_thread_id(self.thread_id) return super().model_post_init(__context) @staticmethod def model_json_schema(*args, **kwargs) -> dict[str, Any]: # type: ignore return BashCommandOverride.model_json_schema(*args, **kwargs)
  • MCP server registration: lists all tools including BashCommand using pre-defined TOOL_PROMPTS.
    @server.list_tools() # type: ignore async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return TOOL_PROMPTS
  • Registers BashCommand tool schema, description, and annotations in TOOL_PROMPTS used by MCP server.
    Tool( inputSchema=remove_titles_from_schema(BashCommand.model_json_schema()), name="BashCommand", description=""" - Execute a bash command. This is stateful (beware with subsequent calls). - Status of the command and the current working directory will always be returned at the end. - The first or the last line might be `(...truncated)` if the output is too long. - Always run `pwd` if you get any file or directory not found error to make sure you're not lost. - Do not run bg commands using "&", instead use this tool. - You must not use echo/cat to read/write files, use ReadFiles/FileWriteOrEdit - In order to check status of previous command, use `status_check` with empty command argument. - Only command is allowed to run at a time. You need to wait for any previous command to finish before running a new one. - Programs don't hang easily, so most likely explanation for no output is usually that the program is still running, and you need to check status again. - Do not send Ctrl-c before checking for status till 10 minutes or whatever is appropriate for the program to finish. - Only run long running commands in background. Each background command is run in a new non-reusable shell. - On running a bg command you'll get a bg command id that you should use to get status or interact. """, annotations=ToolAnnotations(destructiveHint=True, openWorldHint=True), ),

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rusiaaman/wcgw'

If you have feedback or need assistance with the MCP directory API, please join our Discord server