shell_execute
Execute authorized shell commands securely with stdin support using MCP Shell Server. Specify commands, working directory, and timeout for controlled, efficient command execution.
Instructions
Execute um comando shell Comandos permitidos:
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | Comando e seus argumentos como array | |
| directory | Yes | Diretório de trabalho onde o comando será executado | |
| stdin | No | Entrada a ser passada para o comando via stdin | |
| timeout | No | Tempo máximo de execução em segundos |
Implementation Reference
- src/mcp_shell_server/server.py:20-115 (handler)ExecuteToolHandler class: the primary handler for 'shell_execute' tool, sets name, provides schema via get_tool_description, and implements run_tool which validates args, handles timeout, executes via ShellExecutor, and formats TextContent output.class ExecuteToolHandler: """Manipulador para execução de comandos shell""" name = "shell_execute" description = "Execute um comando shell" def __init__(self): self.executor = ShellExecutor() def get_allowed_commands(self) -> list[str]: """Obtém os comandos permitidos""" return self.executor.validator.get_allowed_commands() def get_tool_description(self) -> Tool: """Obtém a descrição da ferramenta para o comando execute""" return Tool( name=self.name, description=( f"{self.description}\n" f"Comandos permitidos: {', '.join(self.get_allowed_commands())}" ), inputSchema={ "type": "object", "properties": { "command": { "type": "array", "items": {"type": "string"}, "description": "Comando e seus argumentos como array", }, "stdin": { "type": "string", "description": "Entrada a ser passada para o comando via stdin", }, "directory": { "type": "string", "description": "Diretório de trabalho onde o comando será executado", }, "timeout": { "type": "integer", "description": "Tempo máximo de execução em segundos", "minimum": 0, }, }, "required": ["command", "directory"], }, ) async def run_tool(self, arguments: dict) -> Sequence[TextContent]: """Executa o comando shell com os argumentos fornecidos""" command = arguments.get("command", []) stdin = arguments.get("stdin") directory = arguments.get("directory", "/tmp") # padrão para /tmp por segurança timeout = arguments.get("timeout") if not command: raise ValueError("Nenhum comando fornecido") if not isinstance(command, list): raise ValueError("'command' deve ser um array") # Certifica-se de que o diretório existe if not directory: raise ValueError("Diretório é obrigatório") content: list[TextContent] = [] try: # Trata execução com timeout try: result = await asyncio.wait_for( self.executor.execute( command, directory, stdin, None ), # Passa None para timeout timeout=timeout, ) except asyncio.TimeoutError as e: raise ValueError("Tempo de execução do comando esgotado") from e if result.get("error"): raise ValueError(result["error"]) # Adiciona stdout se presente if result.get("stdout"): content.append(TextContent(type="text", text=result["stdout"])) # Adiciona stderr se presente (filtra mensagens específicas) stderr = result.get("stderr") if stderr and "cannot set terminal process group" not in stderr: content.append(TextContent(type="text", text=stderr)) except asyncio.TimeoutError as e: raise ValueError( f"Tempo de execução do comando esgotado após {timeout} segundos" ) from e return content
- src/mcp_shell_server/server.py:121-125 (registration)Registration of the shell_execute tool via MCP Server's list_tools decorator, returning the tool's Tool object with schema.@app.list_tools() async def list_tools() -> list[Tool]: """Lista ferramentas disponíveis.""" return [tool_handler.get_tool_description()]
- src/mcp_shell_server/server.py:127-142 (registration)Registration of tool call handler via MCP Server's call_tool decorator, dispatches to ExecuteToolHandler.run_tool if name=='shell_execute'.@app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Manipula chamadas de ferramentas""" try: if name != tool_handler.name: raise ValueError(f"Ferramenta desconhecida: {name}") if not isinstance(arguments, dict): raise ValueError("Argumentos devem ser um dicionário") return await tool_handler.run_tool(arguments) except Exception as e: logger.error(traceback.format_exc()) raise RuntimeError(f"Erro ao executar comando: {str(e)}") from e
- ShellExecutor.execute: core helper method called by the handler, performs command validation, directory checks, preprocessing, IO redirection, subprocess execution (including pipelines), timeout handling, and returns result dict with stdout/stderr/status.async def execute( self, command: List[str], directory: str, stdin: Optional[str] = None, timeout: Optional[int] = None, envs: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: start_time = time.time() process = None # Initialize process variable try: # Validate directory if specified try: self._validate_directory(directory) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Process command preprocessed_command = self.preprocessor.preprocess_command(command) cleaned_command = self.preprocessor.clean_command(preprocessed_command) if not cleaned_command: return { "error": "Empty command", "status": 1, "stdout": "", "stderr": "Empty command", "execution_time": time.time() - start_time, } # First check for pipe operators and handle pipeline if "|" in cleaned_command: try: # Validate pipeline first using the validator try: self.validator.validate_pipeline(cleaned_command) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Split commands commands = self.preprocessor.split_pipe_commands(cleaned_command) if not commands: raise ValueError("Empty command before pipe operator") return await self._execute_pipeline( commands, directory, timeout, envs ) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Then check for other shell operators for token in cleaned_command: try: self.validator.validate_no_shell_operators(token) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Single command execution try: cmd, redirects = self.preprocessor.parse_command(cleaned_command) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } try: self.validator.validate_command(cmd) except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Directory validation if directory: if not os.path.exists(directory): return { "error": f"Directory does not exist: {directory}", "status": 1, "stdout": "", "stderr": f"Directory does not exist: {directory}", "execution_time": time.time() - start_time, } if not os.path.isdir(directory): return { "error": f"Not a directory: {directory}", "status": 1, "stdout": "", "stderr": f"Not a directory: {directory}", "execution_time": time.time() - start_time, } if not cleaned_command: raise ValueError("Empty command") # Initialize stdout_handle with default value stdout_handle: Union[IO[Any], int] = asyncio.subprocess.PIPE try: # Process redirections cmd, redirects = self.io_handler.process_redirections(cleaned_command) # Setup handles for redirection handles = await self.io_handler.setup_redirects(redirects, directory) # Get stdin and stdout from handles if present stdin_data = handles.get("stdin_data") if isinstance(stdin_data, str): stdin = stdin_data # Get stdout handle if present stdout_value = handles.get("stdout") if isinstance(stdout_value, (IO, int)): stdout_handle = stdout_value except ValueError as e: return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } # Execute the command with interactive shell shell = self._get_default_shell() shell_cmd = self.preprocessor.create_shell_command(cmd) shell_cmd = f"{shell} -i -c {shlex.quote(shell_cmd)}" process = await self.process_manager.create_process( shell_cmd, directory, stdout_handle=stdout_handle, envs=envs ) try: # Send input if provided stdin_bytes = stdin.encode() if stdin else None async def communicate_with_timeout(): try: return await process.communicate(input=stdin_bytes) except Exception as e: try: await process.wait() except Exception: pass raise e try: # プロセス通信実行 stdout, stderr = await asyncio.shield( self.process_manager.execute_with_timeout( process, stdin=stdin, timeout=timeout ) ) # ファイルハンドル処理 if isinstance(stdout_handle, IO): try: stdout_handle.close() except (IOError, OSError) as e: logging.warning(f"Error closing stdout: {e}") # Handle case where returncode is None final_returncode = ( 0 if process.returncode is None else process.returncode ) return { "error": None, "stdout": stdout.decode().strip() if stdout else "", "stderr": stderr.decode().strip() if stderr else "", "returncode": final_returncode, "status": process.returncode, "execution_time": time.time() - start_time, "directory": directory, } except asyncio.TimeoutError: # タイムアウト時のプロセスクリーンアップ if process and process.returncode is None: try: process.kill() await asyncio.shield(process.wait()) except ProcessLookupError: # Process already terminated pass # ファイルハンドルクリーンアップ if isinstance(stdout_handle, IO): stdout_handle.close() return { "error": f"Command timed out after {timeout} seconds", "status": -1, "stdout": "", "stderr": f"Command timed out after {timeout} seconds", "execution_time": time.time() - start_time, } except Exception as e: # Exception handler for subprocess if isinstance(stdout_handle, IO): stdout_handle.close() return { "error": str(e), "status": 1, "stdout": "", "stderr": str(e), "execution_time": time.time() - start_time, } finally: if process and process.returncode is None: process.kill() await process.wait() async def _execute_pipeline(
- src/mcp_shell_server/server.py:35-65 (schema)inputSchema definition for shell_execute tool: requires 'command' array and 'directory', optional 'stdin' and 'timeout'.return Tool( name=self.name, description=( f"{self.description}\n" f"Comandos permitidos: {', '.join(self.get_allowed_commands())}" ), inputSchema={ "type": "object", "properties": { "command": { "type": "array", "items": {"type": "string"}, "description": "Comando e seus argumentos como array", }, "stdin": { "type": "string", "description": "Entrada a ser passada para o comando via stdin", }, "directory": { "type": "string", "description": "Diretório de trabalho onde o comando será executado", }, "timeout": { "type": "integer", "description": "Tempo máximo de execução em segundos", "minimum": 0, }, }, "required": ["command", "directory"], }, )