execute_command
Execute local or SSH shell commands to verify Java environment, view process status, and support memory shell detection with auxiliary operations.
Instructions
执行系统命令(本地或通过 SSH 远程执行)
这是一个通用的命令执行工具,可用于:
检查 Java 环境是否正常(java -version)
查看系统进程状态(ps aux)
执行其他辅助命令
注意:内存马检测的核心功能请使用专用工具(list_java_processes、scan_process 等), 此工具仅用于辅助操作。
Args: command: 要执行的 shell 命令 use_ssh: 是否使用 SSH 远程执行 ssh_host: SSH 主机地址(不指定则从环境变量 SSH_HOST 读取) ssh_username: SSH 用户名(不指定则从环境变量 SSH_USERNAME 读取) ssh_password: SSH 密码(不指定则从环境变量 SSH_PASSWORD 读取) ssh_key_path: SSH 私钥路径(不指定则从环境变量 SSH_KEY_PATH 读取) ssh_port: SSH 端口,默认 22(不指定则从环境变量 SSH_PORT 读取) timeout: 命令超时时间(秒),默认 300 秒
Returns: 执行结果,包含 success、stdout、stderr、return_code
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | ||
| use_ssh | No | ||
| ssh_host | No | ||
| ssh_username | No | ||
| ssh_password | No | ||
| ssh_key_path | No | ||
| ssh_port | No | ||
| timeout | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/memory_shell_mcp/__init__.py:267-267 (registration)Tool registration using @mcp.tool() decorator on execute_command function
@mcp.tool() - src/memory_shell_mcp/__init__.py:268-327 (handler)Main handler function for the 'execute_command' tool. Accepts a command string and optional SSH parameters, dispatches to local or SSH execution.
def execute_command( command: str, use_ssh: bool = False, ssh_host: Optional[str] = None, ssh_username: Optional[str] = None, ssh_password: Optional[str] = None, ssh_key_path: Optional[str] = None, ssh_port: int = 22, timeout: int = 300 ) -> dict: """ 执行系统命令(本地或通过 SSH 远程执行) 这是一个通用的命令执行工具,可用于: - 检查 Java 环境是否正常(java -version) - 查看系统进程状态(ps aux) - 执行其他辅助命令 注意:内存马检测的核心功能请使用专用工具(list_java_processes、scan_process 等), 此工具仅用于辅助操作。 Args: command: 要执行的 shell 命令 use_ssh: 是否使用 SSH 远程执行 ssh_host: SSH 主机地址(不指定则从环境变量 SSH_HOST 读取) ssh_username: SSH 用户名(不指定则从环境变量 SSH_USERNAME 读取) ssh_password: SSH 密码(不指定则从环境变量 SSH_PASSWORD 读取) ssh_key_path: SSH 私钥路径(不指定则从环境变量 SSH_KEY_PATH 读取) ssh_port: SSH 端口,默认 22(不指定则从环境变量 SSH_PORT 读取) timeout: 命令超时时间(秒),默认 300 秒 Returns: 执行结果,包含 success、stdout、stderr、return_code """ if use_ssh: ssh_config = get_ssh_config() ssh_host = ssh_host or ssh_config["host"] ssh_username = ssh_username or ssh_config["username"] ssh_password = ssh_password or ssh_config["password"] ssh_key_path = ssh_key_path or ssh_config["key_path"] ssh_port = ssh_port if ssh_port != 22 else ssh_config["port"] if not ssh_host or not ssh_username: return { "success": False, "stdout": "", "stderr": "SSH模式需要提供ssh_host和ssh_username,或设置SSH_HOST和SSH_USERNAME环境变量", "return_code": -1 } return execute_ssh_command( host=ssh_host, username=ssh_username, command=command, password=ssh_password, key_path=ssh_key_path, port=ssh_port, timeout=timeout ) else: return execute_local_command(command, timeout) - Helper function execute_local_command - runs command via subprocess.run with shell=True and timeout support
def execute_local_command(command: str, timeout: int = 300) -> dict: """本地执行命令""" try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "return_code": result.returncode } except subprocess.TimeoutExpired: return { "success": False, "stdout": "", "stderr": f"命令执行超时({timeout}秒)", "return_code": -1 } except Exception as e: return { "success": False, "stdout": "", "stderr": str(e), "return_code": -1 } - Helper function execute_ssh_command - runs command remotely via paramiko SSH client
def execute_ssh_command( host: str, username: str, command: str, password: Optional[str] = None, key_path: Optional[str] = None, port: int = 22, timeout: int = 300 ) -> dict: """SSH远程执行命令""" try: import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) connect_kwargs = { "hostname": host, "port": port, "username": username, "timeout": 30 } if key_path and os.path.exists(key_path): connect_kwargs["key_filename"] = key_path elif password: connect_kwargs["password"] = password else: return { "success": False, "stdout": "", "stderr": "需要提供密码或SSH密钥路径", "return_code": -1 } client.connect(**connect_kwargs) stdin, stdout, stderr = client.exec_command(command, timeout=timeout) stdout_str = stdout.read().decode('utf-8', errors='replace') stderr_str = stderr.read().decode('utf-8', errors='replace') return_code = stdout.channel.recv_exit_status() client.close() return { "success": return_code == 0, "stdout": stdout_str, "stderr": stderr_str, "return_code": return_code } except Exception as e: return { "success": False, "stdout": "", "stderr": str(e), "return_code": -1 } - Helper function get_ssh_config - reads SSH connection settings from environment variables
def get_ssh_config() -> dict: """从环境变量获取SSH配置""" return { "host": os.environ.get("SSH_HOST"), "username": os.environ.get("SSH_USERNAME"), "password": os.environ.get("SSH_PASSWORD"), "key_path": os.environ.get("SSH_KEY_PATH"), "port": int(os.environ.get("SSH_PORT", 22)) }