execute_command
Execute commands on remote SSH hosts using persistent connections. Connect to servers via hostname, IP, or SSH config aliases to run commands and manage remote systems.
Instructions
Execute a command on an SSH host using a persistent session.
The host parameter can be either a hostname/IP or an SSH config alias. If an SSH config alias is provided, configuration will be read from ~/.ssh/config.
Args: host: Hostname, IP address, or SSH config alias (e.g., "myserver") command: Command to execute username: SSH username (optional, will use SSH config or current user) password: Password (optional) key_filename: Path to SSH key file (optional, will use SSH config) port: SSH port (optional, will use SSH config or default 22)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | ||
| host | Yes | ||
| key_filename | No | ||
| password | No | ||
| port | No | ||
| username | No |
Implementation Reference
- mcp_ssh_session/server.py:12-112 (handler)The primary MCP tool handler for 'execute_command'. Decorated with @mcp.tool() which serves as both registration and schema definition via type hints and docstring. Calls SSHSessionManager.execute_command() and handles responses, including async transitions (exit 124) and input awaiting.@mcp.tool() def execute_command( host: str, command: str, username: Optional[str] = None, password: Optional[str] = None, key_filename: Optional[str] = None, port: Optional[int] = None, enable_password: Optional[str] = None, enable_command: str = "enable", sudo_password: Optional[str] = None, timeout: int = 30 ) -> str: """Execute a command on an SSH host using a persistent session. Starts synchronously and waits for completion. If the command doesn't complete within the timeout, it automatically transitions to async mode and returns a command ID for tracking. The host parameter can be either a hostname/IP or an SSH config alias. If an SSH config alias is provided, configuration will be read from ~/.ssh/config. For network devices (routers, switches), use enable_password to automatically enter privileged/enable mode before executing commands. For Unix/Linux hosts requiring sudo, use sudo_password to automatically handle the sudo password prompt. The command will be automatically prefixed with 'sudo' if not already present. Args: host: Hostname, IP address, or SSH config alias (e.g., "myserver") command: Command to execute username: SSH username (optional, will use SSH config or current user) password: Password (optional) key_filename: Path to SSH key file (optional, will use SSH config) port: SSH port (optional, will use SSH config or default 22) enable_password: Enable mode password for network devices (optional) enable_command: Command to enter enable mode (default: "enable") sudo_password: Password for sudo commands on Unix/Linux hosts (optional) timeout: Timeout in seconds for command execution (default: 30) """ logger = session_manager.logger.getChild('tool_execute_command') logger.info(f"Executing command on {host}: {command[:100]}...") try: stdout, stderr, exit_status = session_manager.execute_command( host=host, username=username, command=command, password=password, key_filename=key_filename, port=port, enable_password=enable_password, enable_command=enable_command, sudo_password=sudo_password, timeout=timeout, ) except ConnectionError as e: logger.error(f"Connection error: {e}") return f"Connection Error: {e}" except Exception as e: logger.error(f"Exception during execute_command: {e}", exc_info=True) return f"Error: {e}" # Check if command transitioned to async mode if exit_status == 124: if stderr.startswith("ASYNC:"): command_id = stderr.split(":", 1)[1] response = ( f"Command exceeded timeout of {timeout}s and is now running in background.\n\n" f"Command ID: {command_id}\n\n" f"Use get_command_status('{command_id}') to check progress.\n" f"Use interrupt_command_by_id('{command_id}') to stop it." ) logger.warning(f"Command timed out, returning async response for command_id {command_id}") return response if stderr.startswith("AWAITING_INPUT:"): # Format: AWAITING_INPUT:command_id:reason parts = stderr.split(":", 2) command_id = parts[1] reason = parts[2] if len(parts) > 2 else "unknown" response = ( f"Command paused waiting for user input ({reason}).\n\n" f"Command ID: {command_id}\n\n" f"Use send_input('{command_id}', 'your_input\\n') to provide input.\n" f"For example, if it's a password, provide the password followed by \\n." ) logger.info(f"Command awaiting input, returning instructions for command_id {command_id}") return response result = f"Exit Status: {exit_status}\n\n" if stdout: result += f"STDOUT:\n{stdout}\n" if stderr: result += f"STDERR:\n{stderr}\n" logger.info(f"Command finished with exit status {exit_status}.") logger.debug(f"Returning result:\n{result}") return result
- Core execution logic in CommandExecutor.execute_command(). Starts execution asynchronously via execute_command_async(), then polls for completion, handles timeouts by transitioning to background async, and detects awaiting input. Called by SSHSessionManager.execute_command().def execute_command(self, host: str, username: Optional[str] = None, command: str = "", password: Optional[str] = None, key_filename: Optional[str] = None, port: Optional[int] = None, enable_password: Optional[str] = None, enable_command: str = "enable", sudo_password: Optional[str] = None, timeout: int = 30) -> tuple[str, str, int]: """Execute a command on a host using persistent session.""" logger = self.logger.getChild('execute_command') logger.info(f"[EXEC_REQ] host={host}, cmd={command[:100]}..., timeout={timeout}") # Validate command is_valid, error_msg = self._session_manager._command_validator.validate_command(command) if not is_valid: logger.warning(f"[EXEC_INVALID] {error_msg}") return "", error_msg, 1 # Start async logger.debug(f"[EXEC_ASYNC_START] Starting async execution") try: command_id = self.execute_command_async( host, username, command, password, key_filename, port, sudo_password, enable_password, enable_command, timeout ) except Exception as e: return "", str(e), 1 logger.debug(f"[EXEC_ASYNC_ID] command_id={command_id}") # Poll until done or timeout start = time.time() poll_count = 0 idle_threshold = getattr(self._session_manager, 'SYNC_IDLE_TO_ASYNC', 0) last_activity = start last_stdout = "" last_stderr = "" while time.time() - start < timeout: status = self.get_command_status(command_id) poll_count += 1 if 'error' in status: logger.error(f"[EXEC_ERROR] {status['error']}") return "", status['error'], 1 if status['status'] == 'awaiting_input': reason = status.get('awaiting_input_reason', 'unknown') logger.info(f"[EXEC_AWAIT] Command {command_id} waiting for input: {reason}") # Return immediately for awaiting input, let the tool handle it. return "", f"AWAITING_INPUT:{command_id}:{reason}", 124 if status['status'] != 'running': logger.info(f"[EXEC_DONE] status={status['status']}, polls={poll_count}, duration={time.time() - start:.2f}s") return status['stdout'], status['stderr'], status['exit_code'] or 0 # If the command is running but has been idle for SYNC_IDLE_TO_ASYNC, it means it *should* transition to async # and we should continue polling until the full timeout for this sync execute_command call. # The `_execute_standard_command_internal` function will return `ASYNC:{command_id}` if it hits its idle timeout. # The outer `execute_command` (sync tool) should then continue to poll this async ID. # This block is for when the *internal* async transition happens, but the outer sync call should keep waiting. # If we reached here, it means the internal worker is still 'running' but might be idle or waiting internally. time.sleep(0.1) # If we reach here, the command genuinely timed out based on the outer `timeout` parameter. logger.warning(f"[EXEC_TIMEOUT] Command {command_id} timed out after {timeout}s") status_on_timeout = self.get_command_status(command_id) return status_on_timeout.get('stdout', ''), f"ASYNC:{command_id}", 124
- Wrapper method in SSHSessionManager.execute_command() that delegates to self.command_executor.execute_command(). Provides the entry point from the MCP handler.def execute_command(self, host: str, username: Optional[str] = None, command: str = "", password: Optional[str] = None, key_filename: Optional[str] = None, port: Optional[int] = None, enable_password: Optional[str] = None, enable_command: str = "enable", sudo_password: Optional[str] = None, timeout: int = 30) -> tuple[str, str, int]: """Execute a command on a host using persistent session.""" return self.command_executor.execute_command( host, username, command, password, key_filename, port, enable_password, enable_command, sudo_password, timeout )
- mcp_ssh_session/server.py:12-12 (registration)The @mcp.tool() decorator registers the execute_command function as an MCP tool with the name 'execute_command'.@mcp.tool()