Skip to main content
Glama

execute_command

Execute commands on remote SSH hosts using persistent connections. Connect to servers via hostname, IP, or SSH config aliases to run commands and manage remote systems.

Instructions

Execute a command on an SSH host using a persistent session.

The host parameter can be either a hostname/IP or an SSH config alias. If an SSH config alias is provided, configuration will be read from ~/.ssh/config.

Args: host: Hostname, IP address, or SSH config alias (e.g., "myserver") command: Command to execute username: SSH username (optional, will use SSH config or current user) password: Password (optional) key_filename: Path to SSH key file (optional, will use SSH config) port: SSH port (optional, will use SSH config or default 22)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYes
hostYes
key_filenameNo
passwordNo
portNo
usernameNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The primary MCP tool handler for 'execute_command'. Decorated with @mcp.tool() which serves as both registration and schema definition via type hints and docstring. Calls SSHSessionManager.execute_command() and handles responses, including async transitions (exit 124) and input awaiting.
    @mcp.tool()
    def execute_command(
        host: str,
        command: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
        key_filename: Optional[str] = None,
        port: Optional[int] = None,
        enable_password: Optional[str] = None,
        enable_command: str = "enable",
        sudo_password: Optional[str] = None,
        timeout: int = 30
    ) -> str:
        """Execute a command on an SSH host using a persistent session.
        
        Starts synchronously and waits for completion. If the command doesn't complete
        within the timeout, it automatically transitions to async mode and returns a
        command ID for tracking.
    
        The host parameter can be either a hostname/IP or an SSH config alias.
        If an SSH config alias is provided, configuration will be read from ~/.ssh/config.
    
        For network devices (routers, switches), use enable_password to automatically
        enter privileged/enable mode before executing commands.
    
        For Unix/Linux hosts requiring sudo, use sudo_password to automatically handle
        the sudo password prompt. The command will be automatically prefixed with 'sudo'
        if not already present.
    
        Args:
            host: Hostname, IP address, or SSH config alias (e.g., "myserver")
            command: Command to execute
            username: SSH username (optional, will use SSH config or current user)
            password: Password (optional)
            key_filename: Path to SSH key file (optional, will use SSH config)
            port: SSH port (optional, will use SSH config or default 22)
            enable_password: Enable mode password for network devices (optional)
            enable_command: Command to enter enable mode (default: "enable")
            sudo_password: Password for sudo commands on Unix/Linux hosts (optional)
            timeout: Timeout in seconds for command execution (default: 30)
        """
        logger = session_manager.logger.getChild('tool_execute_command')
        logger.info(f"Executing command on {host}: {command[:100]}...")
        
        try:
            stdout, stderr, exit_status = session_manager.execute_command(
                host=host,
                username=username,
                command=command,
                password=password,
                key_filename=key_filename,
                port=port,
                enable_password=enable_password,
                enable_command=enable_command,
                sudo_password=sudo_password,
                timeout=timeout,
            )
        except ConnectionError as e:
            logger.error(f"Connection error: {e}")
            return f"Connection Error: {e}"
        except Exception as e:
            logger.error(f"Exception during execute_command: {e}", exc_info=True)
            return f"Error: {e}"
    
        # Check if command transitioned to async mode
        if exit_status == 124:
            if stderr.startswith("ASYNC:"):
                command_id = stderr.split(":", 1)[1]
                response = (
                    f"Command exceeded timeout of {timeout}s and is now running in background.\n\n"
                    f"Command ID: {command_id}\n\n"
                    f"Use get_command_status('{command_id}') to check progress.\n"
                    f"Use interrupt_command_by_id('{command_id}') to stop it."
                )
                logger.warning(f"Command timed out, returning async response for command_id {command_id}")
                return response
                
            if stderr.startswith("AWAITING_INPUT:"):
                # Format: AWAITING_INPUT:command_id:reason
                parts = stderr.split(":", 2)
                command_id = parts[1]
                reason = parts[2] if len(parts) > 2 else "unknown"
                
                response = (
                    f"Command paused waiting for user input ({reason}).\n\n"
                    f"Command ID: {command_id}\n\n"
                    f"Use send_input('{command_id}', 'your_input\\n') to provide input.\n"
                    f"For example, if it's a password, provide the password followed by \\n."
                )
                logger.info(f"Command awaiting input, returning instructions for command_id {command_id}")
                return response
    
        result = f"Exit Status: {exit_status}\n\n"
        if stdout:
            result += f"STDOUT:\n{stdout}\n"
        if stderr:
            result += f"STDERR:\n{stderr}\n"
    
        logger.info(f"Command finished with exit status {exit_status}.")
        logger.debug(f"Returning result:\n{result}")
        return result
  • Core execution logic in CommandExecutor.execute_command(). Starts execution asynchronously via execute_command_async(), then polls for completion, handles timeouts by transitioning to background async, and detects awaiting input. Called by SSHSessionManager.execute_command().
    def execute_command(self, host: str, username: Optional[str] = None,
                       command: str = "", password: Optional[str] = None,
                       key_filename: Optional[str] = None,
                       port: Optional[int] = None,
                       enable_password: Optional[str] = None,
                       enable_command: str = "enable",
                       sudo_password: Optional[str] = None,
                       timeout: int = 30) -> tuple[str, str, int]:
        """Execute a command on a host using persistent session."""
        logger = self.logger.getChild('execute_command')
        logger.info(f"[EXEC_REQ] host={host}, cmd={command[:100]}..., timeout={timeout}")
    
        # Validate command
        is_valid, error_msg = self._session_manager._command_validator.validate_command(command)
        if not is_valid:
            logger.warning(f"[EXEC_INVALID] {error_msg}")
            return "", error_msg, 1
    
        # Start async
        logger.debug(f"[EXEC_ASYNC_START] Starting async execution")
        try:
            command_id = self.execute_command_async(
                host, username, command, password, key_filename, port,
                sudo_password, enable_password, enable_command, timeout
            )
        except Exception as e:
            return "", str(e), 1
            
        logger.debug(f"[EXEC_ASYNC_ID] command_id={command_id}")
    
        # Poll until done or timeout
        start = time.time()
        poll_count = 0
        idle_threshold = getattr(self._session_manager, 'SYNC_IDLE_TO_ASYNC', 0)
        last_activity = start
        last_stdout = ""
        last_stderr = ""
        while time.time() - start < timeout:
            status = self.get_command_status(command_id)
            poll_count += 1
    
            if 'error' in status:
                logger.error(f"[EXEC_ERROR] {status['error']}")
                return "", status['error'], 1
            
            if status['status'] == 'awaiting_input':
                reason = status.get('awaiting_input_reason', 'unknown')
                logger.info(f"[EXEC_AWAIT] Command {command_id} waiting for input: {reason}")
                # Return immediately for awaiting input, let the tool handle it.
                return "", f"AWAITING_INPUT:{command_id}:{reason}", 124
                
            if status['status'] != 'running':
                logger.info(f"[EXEC_DONE] status={status['status']}, polls={poll_count}, duration={time.time() - start:.2f}s")
                return status['stdout'], status['stderr'], status['exit_code'] or 0
    
            # If the command is running but has been idle for SYNC_IDLE_TO_ASYNC, it means it *should* transition to async
            # and we should continue polling until the full timeout for this sync execute_command call.
            # The `_execute_standard_command_internal` function will return `ASYNC:{command_id}` if it hits its idle timeout.
            # The outer `execute_command` (sync tool) should then continue to poll this async ID.
            # This block is for when the *internal* async transition happens, but the outer sync call should keep waiting.
            # If we reached here, it means the internal worker is still 'running' but might be idle or waiting internally.
    
            time.sleep(0.1)
    
        # If we reach here, the command genuinely timed out based on the outer `timeout` parameter.
        logger.warning(f"[EXEC_TIMEOUT] Command {command_id} timed out after {timeout}s")
        status_on_timeout = self.get_command_status(command_id)
        return status_on_timeout.get('stdout', ''), f"ASYNC:{command_id}", 124
  • Wrapper method in SSHSessionManager.execute_command() that delegates to self.command_executor.execute_command(). Provides the entry point from the MCP handler.
    def execute_command(self, host: str, username: Optional[str] = None,
                       command: str = "", password: Optional[str] = None,
                       key_filename: Optional[str] = None,
                       port: Optional[int] = None,
                       enable_password: Optional[str] = None,
                       enable_command: str = "enable",
                       sudo_password: Optional[str] = None,
                       timeout: int = 30) -> tuple[str, str, int]:
        """Execute a command on a host using persistent session."""
        return self.command_executor.execute_command(
            host, username, command, password, key_filename, port,
            enable_password, enable_command, sudo_password, timeout
        )
  • The @mcp.tool() decorator registers the execute_command function as an MCP tool with the name 'execute_command'.
    @mcp.tool()

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/devnullvoid/mcp-ssh-session'

If you have feedback or need assistance with the MCP directory API, please join our Discord server