mcp-wcgw

by rusiaaman
Verified
import os import subprocess import tempfile from typing import Generator import pytest from wcgw.client.bash_state.bash_state import BashState from wcgw.client.tools import ( BashCommand, Context, ContextSave, Initialize, ReadFiles, ReadImage, default_enc, get_tool_output, which_tool_name, ) from wcgw.types_ import ( Command, Console, FileWriteOrEdit, SendAscii, SendSpecials, SendText, StatusCheck, ) class TestConsole(Console): def __init__(self): self.logs = [] self.prints = [] def log(self, msg: str) -> None: self.logs.append(msg) def print(self, msg: str) -> None: self.prints.append(msg) @pytest.fixture def temp_dir() -> Generator[str, None, None]: """Provides a temporary directory for testing.""" with tempfile.TemporaryDirectory() as td: yield td @pytest.fixture def context(temp_dir: str) -> Generator[Context, None, None]: """Provides a test context with temporary directory and handles cleanup.""" console = TestConsole() bash_state = BashState( console=console, working_dir=temp_dir, bash_command_mode=None, file_edit_mode=None, write_if_empty_mode=None, mode=None, use_screen=True, ) ctx = Context( bash_state=bash_state, console=console, ) yield ctx # Cleanup after each test try: bash_state.sendintr() # Send Ctrl-C to any running process bash_state.reset_shell() # Reset shell state bash_state.cleanup() # Cleanup final shell except Exception as e: print(f"Error during cleanup: {e}") def test_initialize(context: Context, temp_dir: str) -> None: """Test the Initialize tool with various configurations.""" # Test default wcgw mode init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert temp_dir in outputs[0] assert "System:" in outputs[0] # Test architect mode init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="architect", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) # Test code_writer mode with specific configuration code_writer_config = { "allowed_commands": ["ls", "pwd", "cat"], "allowed_globs": ["*.py", "*.txt"], } init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="code_writer", code_writer_config=code_writer_config, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) # Test with initial files to read and task resumption test_file = os.path.join(temp_dir, "test.txt") with open(test_file, "w") as f: f.write("test content") # First save context save_args = ContextSave( id="test_task_123", project_root_path=temp_dir, description="Test context", relevant_file_globs=["*.txt"], ) get_tool_output(context, save_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Now try to resume the saved context init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[test_file], task_id_to_resume="test_task_123", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert test_file in outputs[0] # Should show the file in tree structure assert "Following is the retrieved" in outputs[0] # Verify context was retrieved # Test mode override when resuming context # First save context in wcgw mode new_test_file = os.path.join(temp_dir, "test2.txt") with open(new_test_file, "w") as f: f.write("test content 2") save_args = ContextSave( id="test_task_mode_switch", project_root_path=temp_dir, description="Test context with mode switch", relevant_file_globs=["*.txt"], ) get_tool_output(context, save_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Now try to resume the saved context but in architect mode init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[new_test_file], task_id_to_resume="test_task_mode_switch", mode_name="architect", # Different mode than what was used in saving code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert new_test_file in outputs[0] # Should show the file in tree structure assert "Following is the retrieved" in outputs[0] # Verify context was retrieved assert ( 'running in "architect" mode' in outputs[0].lower() ) # Verify mode was overridden to architect # Test with empty workspace path init_args = Initialize( type="first_call", any_workspace_path="", initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) # Test with non-existent workspace path nonexistent_path = os.path.join(temp_dir, "does_not_exist") init_args = Initialize( type="first_call", any_workspace_path=nonexistent_path, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert "does_not_exist" in outputs[0] # Should mention the path in output # Test with a file as workspace path file_as_workspace = os.path.join(temp_dir, "workspace.txt") with open(file_as_workspace, "w") as f: f.write("test content") init_args = Initialize( type="first_call", any_workspace_path=file_as_workspace, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert file_as_workspace in outputs[0] # Should show the file path def test_bash_command(context: Context, temp_dir: str) -> None: """Test the BashCommand tool.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Test when nothing is running cmd = BashCommand(action_json=StatusCheck(status_check=True)) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "No running command to check status of" in outputs[0] # Start a command and check status cmd = BashCommand(action_json=Command(command="sleep 1"), wait_for_seconds=0.1) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = still running" in outputs[0] # Check status while command is running status_check = BashCommand(action_json=StatusCheck(status_check=True)) outputs, _ = get_tool_output( context, status_check, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "status = process exited" in outputs[0] # Test simple command cmd = BashCommand(action_json=Command(command="echo 'hello world'")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert "hello world" in outputs[0] def test_interaction_commands(context: Context, temp_dir: str) -> None: """Test the various interaction command types.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Test text interaction cmd = BashCommand(action_json=SendText(send_text="hello")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) # Test special keys cmd = BashCommand(action_json=SendSpecials(send_specials=["Enter"])) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert "status = process exited" in outputs[0] # Send ctrl-c cmd = BashCommand(action_json=SendAscii(send_ascii=[3])) # Ctrl-C outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert "status = process exited" in outputs[0] # Test interactions with long running command cmd = BashCommand(action_json=Command(command="sleep 1"), wait_for_seconds=0.1) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = still running" in outputs[0] # Check status with special keys cmd = BashCommand(action_json=SendSpecials(send_specials=["Enter"])) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = process exited" in outputs[0] # Test interrupting command cmd = BashCommand(action_json=Command(command="sleep 1"), wait_for_seconds=0.1) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = still running" in outputs[0] # Send Ctrl-C cmd = BashCommand(action_json=SendSpecials(send_specials=["Ctrl-c"])) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "status = process exited" in outputs[0] def test_write_and_read_file(context: Context, temp_dir: str) -> None: """Test WriteIfEmpty and ReadFiles tools.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Test writing a file test_file = os.path.join(temp_dir, "test.txt") write_args = FileWriteOrEdit( file_path=test_file, percentage_to_change=100, file_content_or_search_replace_blocks="test content\n", ) outputs, _ = get_tool_output( context, write_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Success" in outputs[0] # Test reading the file back read_args = ReadFiles(file_paths=[test_file]) outputs, _ = get_tool_output( context, read_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "test content" in outputs[0] # Simulate external modification of the file with open(test_file, "w") as f: f.write("modified content\n") # Attempt to write again write_args = FileWriteOrEdit( file_path=test_file, percentage_to_change=100, file_content_or_search_replace_blocks="new content\n", ) outputs, _ = get_tool_output( context, write_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) # Verify the error message assert "Error: the file has changed since last read." in outputs[0] test_file2 = os.path.join(temp_dir, "test2.txt") with open(test_file2, "w") as f: f.write("existing content\n") # Test writing to an existing file without reading it first (should warn) write_args = FileWriteOrEdit( file_path=test_file2, percentage_to_change=100, file_content_or_search_replace_blocks="new content\n", ) outputs, _ = get_tool_output( context, write_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert ( "Error: you need to read existing " in outputs[0] ) # Should fail with exception # Test writing after reading the file (should succeed with warning) read_args = ReadFiles(file_paths=[test_file2]) outputs, _ = get_tool_output( context, read_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) write_args = FileWriteOrEdit( file_path=test_file2, percentage_to_change=100, file_content_or_search_replace_blocks="new content after read\n", ) outputs, _ = get_tool_output( context, write_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Success" in outputs[0] # Verify the new content was written read_args = ReadFiles(file_paths=[test_file2]) outputs, _ = get_tool_output( context, read_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "new content after read" in outputs[0] def test_context_save(context: Context, temp_dir: str) -> None: """Test the ContextSave tool.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Create some test files test_file1 = os.path.join(temp_dir, "test1.txt") test_file2 = os.path.join(temp_dir, "test2.txt") with open(test_file1, "w") as f: f.write("test content 1") with open(test_file2, "w") as f: f.write("test content 2") # Test saving context save_args = ContextSave( id="test_save", project_root_path=temp_dir, description="Test save", relevant_file_globs=["*.txt"], ) outputs, _ = get_tool_output( context, save_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) assert outputs[0].endswith(".txt") # Context files end with .txt extension def test_reinitialize(context: Context, temp_dir: str) -> None: """Test the tool with various mode changes.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Test shell reset without mode change reset_args = Initialize( type="user_asked_mode_change", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, reset_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Reset successful" in outputs[0] assert "mode change" not in outputs[0].lower() # Test changing to architect mode reset_args = Initialize( type="user_asked_mode_change", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="architect", code_writer_config=None, ) outputs, _ = get_tool_output( context, reset_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Reset successful with mode change to architect" in outputs[0] # Test changing to code_writer mode with config code_writer_config = {"allowed_commands": [], "allowed_globs": ["*.py"]} reset_args = Initialize( type="user_asked_mode_change", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="code_writer", code_writer_config=code_writer_config, ) outputs, _ = get_tool_output( context, reset_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Reset successful with mode change to code_writer" in outputs[0] assert context.bash_state._write_if_empty_mode.allowed_globs == [ temp_dir + "/" + "*.py" ] assert context.bash_state.file_edit_mode.allowed_globs == [temp_dir + "/" + "*.py"] # Verify mode was actually changed by trying a command not in allowed list cmd = BashCommand(action_json=Command(command="touch test.txt")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "Error: BashCommand not allowed in current mode" in str(outputs[0]) # Test changing to code_writer mode with config reset_args = Initialize( type="user_asked_change_workspace", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="shouldnot_proceed", mode_name="architect", code_writer_config=None, ) outputs, _ = get_tool_output( context, reset_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Warning: task can only be resumed in a new conversation" in outputs[0] assert '"architect" mode' in outputs[0] # Test do not print prompt again # Test changing to code_writer mode with config reset_args = Initialize( type="user_asked_change_workspace", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="architect", code_writer_config=None, ) outputs, _ = get_tool_output( context, reset_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "architect mode" not in outputs[0] def _test_init(context: Context, temp_dir: str) -> None: """Initialize test environment.""" init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Reset shell to clean state context.bash_state.reset_shell() def test_file_io(context: Context, temp_dir: str) -> None: """Test reading from a file with cat.""" _test_init(context, temp_dir) test_file = os.path.join(temp_dir, "input.txt") with open(test_file, "w") as f: f.write("hello world") cmd = BashCommand(action_json=Command(command=f"cat {test_file}")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "hello world" in outputs[0] assert "status = process exited" in outputs[0] def test_command_interrupt(context: Context, temp_dir: str) -> None: """Test Ctrl-C interruption.""" _test_init(context, temp_dir) cmd = BashCommand(action_json=Command(command="sleep 5"), wait_for_seconds=0.1) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = still running" in outputs[0] cmd = BashCommand(action_json=SendSpecials(send_specials=["Ctrl-c"])) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = process exited" in outputs[0] def test_command_suspend(context: Context, temp_dir: str) -> None: """Test Ctrl-Z suspension.""" _test_init(context, temp_dir) cmd = BashCommand(action_json=Command(command="sleep 5"), wait_for_seconds=0.1) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = still running" in outputs[0] def test_text_input(context: Context, temp_dir: str) -> None: """Test sending text to a program.""" _test_init(context, temp_dir) cmd = BashCommand(action_json=Command(command="cat")) get_tool_output(context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None) cmd = BashCommand(action_json=SendText(send_text="hello")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "hello" in str(outputs[0]) cmd = BashCommand(action_json=SendSpecials(send_specials=["Ctrl-d"])) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = process exited" in str(outputs[0]) def test_ascii_input(context: Context, temp_dir: str) -> None: """Test sending ASCII codes.""" _test_init(context, temp_dir) cmd = BashCommand(action_json=Command(command="cat")) get_tool_output(context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None) cmd = BashCommand(action_json=SendAscii(send_ascii=[65, 66, 67])) # ABC outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "ABC" in str(outputs[0]) cmd = BashCommand(action_json=SendAscii(send_ascii=[3])) # Ctrl-C outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert "status = process exited" in str(outputs[0]) def test_read_image(context: Context, temp_dir: str) -> None: """Test the ReadImage tool.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Create a small test image test_image = os.path.join(temp_dir, "test.png") with open(test_image, "wb") as f: # Write a minimal valid PNG file f.write( bytes.fromhex( "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4890000000d4944415478da63640000000600005c0010ef0000000049454e44ae426082" ) ) # Test reading image read_args = ReadImage(file_path=test_image) outputs, _ = get_tool_output( context, read_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert hasattr(outputs[0], "media_type") assert outputs[0].media_type == "image/png" assert hasattr(outputs[0], "data") def test_which_tool_name() -> None: """Test the which_tool_name function.""" # Test each tool type assert which_tool_name("BashCommand") == BashCommand assert which_tool_name("FileWriteOrEdit") == FileWriteOrEdit assert which_tool_name("ReadImage") == ReadImage assert which_tool_name("ReadFiles") == ReadFiles assert which_tool_name("Initialize") == Initialize assert which_tool_name("ContextSave") == ContextSave # Test error case with unknown tool with pytest.raises(ValueError) as exc_info: which_tool_name("UnknownTool") assert "Unknown tool name: UnknownTool" in str(exc_info.value) def test_git_recent_files(context: Context, temp_dir: str) -> None: """Test git repository recent files feature with 100 files in batches of 20.""" # Initialize a git repository os.chdir(temp_dir) subprocess.run(["git", "init"], check=True) subprocess.run(["git", "config", "user.email", "test@example.com"], check=True) subprocess.run(["git", "config", "user.name", "Test User"], check=True) # Create 100 files in 5 batches of 20 files each all_files = [] for batch in range(20): batch_files = [] # Create 5 files per batch for i in range(5): file_num = batch * 5 + i + 1 file_name = f"file{file_num:03d}.txt" batch_files.append(file_name) with open(os.path.join(temp_dir, file_name), "w") as f: f.write(f"Content for {file_name}") subprocess.run(["git", "add", file_name], check=True) # Commit this batch subprocess.run(["git", "commit", "-m", f"Add batch {batch + 1}"], check=True) all_files.extend(batch_files) recent_files = all_files[-10:] # Initialize with the git repository init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) outputs, _ = get_tool_output( context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert isinstance(outputs[0], str) # Check that the output contains files from the more recent commits repo_structure = outputs[0] # All 10 recent files should be in structure for file in recent_files: assert file in repo_structure def test_error_cases(context: Context, temp_dir: str) -> None: """Test various error cases.""" # First initialize init_args = Initialize( type="first_call", any_workspace_path=temp_dir, initial_files_to_read=[], task_id_to_resume="", mode_name="wcgw", code_writer_config=None, ) get_tool_output(context, init_args, default_enc, 1.0, lambda x, y: ("", 0.0), None) # Test reading non-existent file read_args = ReadFiles(file_paths=[os.path.join(temp_dir, "nonexistent.txt")]) outputs, _ = get_tool_output( context, read_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Error" in outputs[0] # Test writing to non-existent directory write_args = FileWriteOrEdit( file_path=os.path.join(temp_dir, "nonexistent", "test.txt"), file_content_or_search_replace_blocks="test", percentage_to_change=100, ) outputs, _ = get_tool_output( context, write_args, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "Success" in outputs[0] # Should succeed as it creates directories # Test invalid bash command cmd = BashCommand(action_json=Command(command="nonexistentcommand")) outputs, _ = get_tool_output( context, cmd, default_enc, 1.0, lambda x, y: ("", 0.0), None ) assert len(outputs) == 1 assert "not found" in str(outputs[0]).lower()