shell_execute
Execute whitelisted shell commands remotely with support for stdin input and specified working directory using the MCP protocol for secure and controlled operations.
Instructions
Execute a shell command Allowed commands:
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | Command and its arguments as array | |
| directory | Yes | Absolute path to a working directory where the command will be executed | |
| stdin | No | Input to be passed to the command via stdin | |
| timeout | No | Maximum execution time in seconds |
Implementation Reference
- src/mcp_shell_server/server.py:75-123 (handler)The run_tool method implements the core execution logic for the shell_execute tool. It processes arguments, handles timeouts, calls ShellExecutor.execute, and returns results as MCP TextContent.async def run_tool(self, arguments: dict) -> Sequence[TextContent]: """Execute the shell command with the given arguments""" command = arguments.get("command", []) stdin = arguments.get("stdin") directory = arguments.get("directory", "/tmp") # default to /tmp for safety timeout = arguments.get("timeout") if not command: raise ValueError("No command provided") if not isinstance(command, list): raise ValueError("'command' must be an array") # Make sure directory exists if not directory: raise ValueError("Directory is required") content: list[TextContent] = [] try: # Handle execution with timeout try: # Add small buffer to timeout for CI scheduling delays if timeout is specified actual_timeout = timeout + 0.5 if timeout is not None else None result = await asyncio.wait_for( self.executor.execute( command, directory, stdin, None ), # Pass None for timeout timeout=actual_timeout, ) except asyncio.TimeoutError as e: raise ValueError("Command execution timed out") from e if result.get("error"): raise ValueError(result["error"]) # Add stdout if present if result.get("stdout"): content.append(TextContent(type="text", text=result["stdout"])) # Add stderr if present (filter out specific messages) stderr = result.get("stderr") if stderr and "cannot set terminal process group" not in stderr: content.append(TextContent(type="text", text=stderr)) except asyncio.TimeoutError as e: raise ValueError(f"Command timed out after {timeout} seconds") from e return content
- src/mcp_shell_server/server.py:41-73 (schema)The get_tool_description method defines the Tool schema for shell_execute, including inputSchema with properties for command, stdin, directory, and timeout.def get_tool_description(self) -> Tool: """Get the tool description for the execute command""" allowed_commands = ", ".join(self.get_allowed_commands()) allowed_patterns = ", ".join(self.get_allowed_patterns()) """Get the tool description for the execute command""" return Tool( name=self.name, description=f"{self.description}\nAllowed commands: {allowed_commands}\nAllowed patterns: {allowed_patterns}", inputSchema={ "type": "object", "properties": { "command": { "type": "array", "items": {"type": "string"}, "description": "Command and its arguments as array", }, "stdin": { "type": "string", "description": "Input to be passed to the command via stdin", }, "directory": { "type": "string", "description": "Absolute path to a working directory where the command will be executed", }, "timeout": { "type": "integer", "description": "Maximum execution time in seconds", "minimum": 0, }, }, "required": ["command", "directory"], }, )
- src/mcp_shell_server/server.py:126-151 (registration)Tool handler instantiation and registration via @app.list_tools() and @app.call_tool() decorators, which list the tool and dispatch calls to the handler.# Initialize tool handlers tool_handler = ExecuteToolHandler() @app.list_tools() async def list_tools() -> list[Tool]: """List available tools.""" return [tool_handler.get_tool_description()] @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Handle tool calls""" try: if name != tool_handler.name: raise ValueError(f"Unknown tool: {name}") if not isinstance(arguments, dict): raise ValueError("Arguments must be a dictionary") return await tool_handler.run_tool(arguments) except Exception as e: logger.error(traceback.format_exc()) raise RuntimeError(f"Error executing command: {str(e)}") from e
- ShellExecutor.execute method contains the primary shell execution logic delegated by the tool handler, including command validation, preprocessing, IO redirection, and process management.async def execute( self, command: List[str], directory: str, stdin: Optional[str] = None, timeout: Optional[int] = None, envs: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: start_time = time.time() process = None # Initialize process variable try: # Validate directory if specified try: self._validate_directory(directory) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Process command preprocessed_command = self.preprocessor.preprocess_command(command) cleaned_command = self.preprocessor.clean_command(preprocessed_command) if not cleaned_command: return { "error": "Empty command", "status": 1, "stdout": "", "stderr": "Empty command", "execution_time": time.time() - start_time, } # First check for pipe operators and handle pipeline if "|" in cleaned_command: try: # Validate pipeline first using the validator try: self.validator.validate_pipeline(cleaned_command) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Split commands commands = self.preprocessor.split_pipe_commands(cleaned_command) if not commands: raise ValueError("Empty command before pipe operator") return await self._execute_pipeline( commands, directory, timeout, envs ) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Then check for other shell operators for token in cleaned_command: try: self.validator.validate_no_shell_operators(token) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Single command execution try: cmd, redirects = self.preprocessor.parse_command(cleaned_command) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } try: self.validator.validate_command(cmd) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Directory validation if directory: if not os.path.exists(directory): return { "error": f"Directory does not exist: {directory}", "status": 1, "stdout": "", "stderr": f"Directory does not exist: {directory}", "execution_time": time.time() - start_time, } if not os.path.isdir(directory): return { "error": f"Not a directory: {directory}", "status": 1, "stdout": "", "stderr": f"Not a directory: {directory}", "execution_time": time.time() - start_time, } if not cleaned_command: # pragma: no cover raise ValueError("Empty command") # Initialize stdout_handle with default value stdout_handle: Union[IO[Any], int] = asyncio.subprocess.PIPE try: # Process redirections cmd, redirects = self.io_handler.process_redirections(cleaned_command) # Setup handles for redirection handles = await self.io_handler.setup_redirects(redirects, directory) # Get stdin and stdout from handles if present stdin_data = handles.get("stdin_data") if isinstance(stdin_data, str): stdin = stdin_data # Get stdout handle if present stdout_value = handles.get("stdout") if isinstance(stdout_value, (IO, int)): stdout_handle = stdout_value except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Execute the command with shell shell = self._get_default_shell() shell_cmd = self.preprocessor.create_shell_command(cmd) shell_cmd = f"{shell} -c {shlex.quote(shell_cmd)}" process = await self.process_manager.create_process( shell_cmd, directory, stdout_handle=stdout_handle, envs=envs ) try: # Send input if provided stdin_bytes = stdin.encode() if stdin else None async def communicate_with_timeout(): # pragma: no cover try: return await process.communicate(input=stdin_bytes) except Exception as e: try: await process.wait() except Exception: pass raise e try: # プロセス通信実行 stdout, stderr = await asyncio.shield( self.process_manager.execute_with_timeout( process, stdin=stdin, timeout=timeout ) ) # ファイルハンドル処理 if isinstance(stdout_handle, IO): try: stdout_handle.close() except (IOError, OSError) as e: logging.warning(f"Error closing stdout: {e}") # Handle case where returncode is None final_returncode = ( 0 if process.returncode is None else process.returncode ) return { "error": None, "stdout": stdout.decode().strip() if stdout else "", "stderr": stderr.decode().strip() if stderr else "", "returncode": final_returncode, "status": process.returncode, "execution_time": time.time() - start_time, "directory": directory, } except asyncio.TimeoutError: # タイムアウト時のプロセスクリーンアップ if process and process.returncode is None: try: process.kill() await asyncio.shield(process.wait()) except ProcessLookupError: # Process already terminated pass # ファイルハンドルクリーンアップ if isinstance(stdout_handle, IO): stdout_handle.close() return { "error": f"Command timed out after {timeout} seconds", "status": -1, "stdout": "", "stderr": f"Command timed out after {timeout} seconds", "execution_time": time.time() - start_time, } except Exception as e: # Exception handler for subprocess if isinstance(stdout_handle, IO): stdout_handle.close() return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } finally: if process and process.returncode is None: process.kill() await process.wait() async def _execute_pipeline(