Skip to main content
Glama
tumf

mcp-shell-server

by tumf

shell_execute

Execute whitelisted shell commands with stdin input and directory control through the MCP server for secure remote command execution.

Instructions

Execute a shell command Allowed commands: Allowed patterns:

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYesCommand and its arguments as array
stdinNoInput to be passed to the command via stdin
directoryYesAbsolute path to a working directory where the command will be executed
timeoutNoMaximum execution time in seconds

Implementation Reference

  • The run_tool method in ExecuteToolHandler that handles the execution of the shell_execute tool, calling ShellExecutor.execute and formatting the response for MCP.
    async def run_tool(self, arguments: dict) -> Sequence[TextContent]:
        """Execute the shell command with the given arguments"""
        command = arguments.get("command", [])
        stdin = arguments.get("stdin")
        directory = arguments.get("directory", "/tmp")  # default to /tmp for safety
        timeout = arguments.get("timeout")
    
        if not command:
            raise ValueError("No command provided")
    
        if not isinstance(command, list):
            raise ValueError("'command' must be an array")
    
        # Make sure directory exists
        if not directory:
            raise ValueError("Directory is required")
    
        content: list[TextContent] = []
        try:
            # Handle execution with timeout
            try:
                # Add small buffer to timeout for CI scheduling delays if timeout is specified
                actual_timeout = timeout + 0.5 if timeout is not None else None
    
                result = await asyncio.wait_for(
                    self.executor.execute(
                        command, directory, stdin, None
                    ),  # Pass None for timeout
                    timeout=actual_timeout,
                )
            except asyncio.TimeoutError as e:
                raise ValueError("Command execution timed out") from e
    
            if result.get("error"):
                raise ValueError(result["error"])
    
            # Add stdout if present
            if result.get("stdout"):
                content.append(TextContent(type="text", text=result["stdout"]))
    
            # Add stderr if present (filter out specific messages)
            stderr = result.get("stderr")
            if stderr and "cannot set terminal process group" not in stderr:
                content.append(TextContent(type="text", text=stderr))
    
        except asyncio.TimeoutError as e:
            raise ValueError(f"Command timed out after {timeout} seconds") from e
    
        return content
  • Tool schema definition returned by get_tool_description, including input schema with properties for command, stdin, directory, and timeout.
    return Tool(
        name=self.name,
        description=f"{self.description}\nAllowed commands: {allowed_commands}\nAllowed patterns: {allowed_patterns}",
        inputSchema={
            "type": "object",
            "properties": {
                "command": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Command and its arguments as array",
                },
                "stdin": {
                    "type": "string",
                    "description": "Input to be passed to the command via stdin",
                },
                "directory": {
                    "type": "string",
                    "description": "Absolute path to a working directory where the command will be executed",
                },
                "timeout": {
                    "type": "integer",
                    "description": "Maximum execution time in seconds",
                    "minimum": 0,
                },
            },
            "required": ["command", "directory"],
        },
    )
  • Registration of the shell_execute tool via MCP Server decorators list_tools and call_tool, with instantiation of ExecuteToolHandler.
    # Initialize tool handlers
    tool_handler = ExecuteToolHandler()
    
    
    @app.list_tools()
    async def list_tools() -> list[Tool]:
        """List available tools."""
        return [tool_handler.get_tool_description()]
    
    
    @app.call_tool()
    async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]:
        """Handle tool calls"""
        try:
            if name != tool_handler.name:
                raise ValueError(f"Unknown tool: {name}")
    
            if not isinstance(arguments, dict):
                raise ValueError("Arguments must be a dictionary")
    
            return await tool_handler.run_tool(arguments)
    
        except Exception as e:
            logger.error(traceback.format_exc())
            raise RuntimeError(f"Error executing command: {str(e)}") from e
  • ShellExecutor.execute method containing the core logic for executing shell commands securely, including validation, preprocessing, and process management.
    async def execute(
        self,
        command: List[str],
        directory: str,
        stdin: Optional[str] = None,
        timeout: Optional[int] = None,
        envs: Optional[Dict[str, str]] = None,
    ) -> Dict[str, Any]:
        start_time = time.time()
        process = None  # Initialize process variable
    
        try:
            # Validate directory if specified
            try:
                self._validate_directory(directory)
            except ValueError as e:
                return {
                    "error": str(e),
                    "status": 1,
                    "stdout": "",
                    "stderr": str(e),
                    "execution_time": time.time() - start_time,
                }
    
            # Process command
            preprocessed_command = self.preprocessor.preprocess_command(command)
            cleaned_command = self.preprocessor.clean_command(preprocessed_command)
            if not cleaned_command:
                return {
                    "error": "Empty command",
                    "status": 1,
                    "stdout": "",
                    "stderr": "Empty command",
                    "execution_time": time.time() - start_time,
                }
    
            # First check for pipe operators and handle pipeline
            if "|" in cleaned_command:
                try:
                    # Validate pipeline first using the validator
                    try:
                        self.validator.validate_pipeline(cleaned_command)
                    except ValueError as e:
                        return {
                            "error": str(e),
                            "status": 1,
                            "stdout": "",
                            "stderr": str(e),
                            "execution_time": time.time() - start_time,
                        }
    
                    # Split commands
                    commands = self.preprocessor.split_pipe_commands(cleaned_command)
                    if not commands:
                        raise ValueError("Empty command before pipe operator")
    
                    return await self._execute_pipeline(
                        commands, directory, timeout, envs
                    )
                except ValueError as e:
                    return {
                        "error": str(e),
                        "status": 1,
                        "stdout": "",
                        "stderr": str(e),
                        "execution_time": time.time() - start_time,
                    }
    
            # Then check for other shell operators
            for token in cleaned_command:
                try:
                    self.validator.validate_no_shell_operators(token)
                except ValueError as e:
                    return {
                        "error": str(e),
                        "status": 1,
                        "stdout": "",
                        "stderr": str(e),
                        "execution_time": time.time() - start_time,
                    }
    
            # Single command execution
            try:
                cmd, redirects = self.preprocessor.parse_command(cleaned_command)
            except ValueError as e:
                return {
                    "error": str(e),
                    "status": 1,
                    "stdout": "",
                    "stderr": str(e),
                    "execution_time": time.time() - start_time,
                }
    
            try:
                self.validator.validate_command(cmd)
            except ValueError as e:
                return {
                    "error": str(e),
                    "status": 1,
                    "stdout": "",
                    "stderr": str(e),
                    "execution_time": time.time() - start_time,
                }
    
            # Directory validation
            if directory:
                if not os.path.exists(directory):
                    return {
                        "error": f"Directory does not exist: {directory}",
                        "status": 1,
                        "stdout": "",
                        "stderr": f"Directory does not exist: {directory}",
                        "execution_time": time.time() - start_time,
                    }
                if not os.path.isdir(directory):
                    return {
                        "error": f"Not a directory: {directory}",
                        "status": 1,
                        "stdout": "",
                        "stderr": f"Not a directory: {directory}",
                        "execution_time": time.time() - start_time,
                    }
            if not cleaned_command:  # pragma: no cover
                raise ValueError("Empty command")
    
            # Initialize stdout_handle with default value
            stdout_handle: Union[IO[Any], int] = asyncio.subprocess.PIPE
    
            try:
                # Process redirections
                cmd, redirects = self.io_handler.process_redirections(cleaned_command)
    
                # Setup handles for redirection
                handles = await self.io_handler.setup_redirects(redirects, directory)
    
                # Get stdin and stdout from handles if present
                stdin_data = handles.get("stdin_data")
                if isinstance(stdin_data, str):
                    stdin = stdin_data
    
                # Get stdout handle if present
                stdout_value = handles.get("stdout")
                if isinstance(stdout_value, (IO, int)):
                    stdout_handle = stdout_value
    
            except ValueError as e:
                return {
                    "error": str(e),
                    "status": 1,
                    "stdout": "",
                    "stderr": str(e),
                    "execution_time": time.time() - start_time,
                }
    
            # Execute the command with shell
            shell = self._get_default_shell()
            shell_cmd = self.preprocessor.create_shell_command(cmd)
            shell_cmd = f"{shell} -c {shlex.quote(shell_cmd)}"
    
            process = await self.process_manager.create_process(
                shell_cmd, directory, stdout_handle=stdout_handle, envs=envs
            )
    
            try:
                # Send input if provided
                stdin_bytes = stdin.encode() if stdin else None
    
                async def communicate_with_timeout():  # pragma: no cover
                    try:
                        return await process.communicate(input=stdin_bytes)
                    except Exception as e:
                        try:
                            await process.wait()
                        except Exception:
                            pass
                        raise e
    
                try:
                    # プロセス通信実行
                    stdout, stderr = await asyncio.shield(
                        self.process_manager.execute_with_timeout(
                            process, stdin=stdin, timeout=timeout
                        )
                    )
    
                    # ファイルハンドル処理
                    if isinstance(stdout_handle, IO):
                        try:
                            stdout_handle.close()
                        except (IOError, OSError) as e:
                            logging.warning(f"Error closing stdout: {e}")
    
                    # Handle case where returncode is None
                    final_returncode = (
                        0 if process.returncode is None else process.returncode
                    )
    
                    return {
                        "error": None,
                        "stdout": stdout.decode().strip() if stdout else "",
                        "stderr": stderr.decode().strip() if stderr else "",
                        "returncode": final_returncode,
                        "status": process.returncode,
                        "execution_time": time.time() - start_time,
                        "directory": directory,
                    }
    
                except asyncio.TimeoutError:
                    # タイムアウト時のプロセスクリーンアップ
                    if process and process.returncode is None:
                        try:
                            process.kill()
                            await asyncio.shield(process.wait())
                        except ProcessLookupError:
                            # Process already terminated
                            pass
    
                    # ファイルハンドルクリーンアップ
                    if isinstance(stdout_handle, IO):
                        stdout_handle.close()
    
                    return {
                        "error": f"Command timed out after {timeout} seconds",
                        "status": -1,
                        "stdout": "",
                        "stderr": f"Command timed out after {timeout} seconds",
                        "execution_time": time.time() - start_time,
                    }
    
            except Exception as e:  # Exception handler for subprocess
                if isinstance(stdout_handle, IO):
                    stdout_handle.close()
                return {
                    "error": str(e),
                    "status": 1,
                    "stdout": "",
                    "stderr": str(e),
                    "execution_time": time.time() - start_time,
                }
    
        finally:
            if process and process.returncode is None:
                process.kill()
                await process.wait()
    
    async def _execute_pipeline(
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tumf/mcp-shell-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server