remove_memory_shell
Remove a specified memory shell class from a running JVM process without restart, after AI analysis confirms it is malicious.
Instructions
执行 memory-shell-detector-cli.jar 从 JVM 内存中移除指定的内存马类
底层命令: java -jar memory-shell-detector-cli.jar -r <class_name> -p
此工具通过 Java Agent 技术从运行中的 JVM 进程中卸载/禁用指定的恶意类, 实现不重启服务的情况下清除内存马。
移除机制:
对于 Filter/Servlet/Listener:从 Web 容器中注销
对于 Spring 组件:从 Spring 容器中移除 Bean
对于 Agent 类型:尝试还原被 hook 的方法
安全机制:首次调用时会先反编译目标类源码供 AI 分析确认, 确认是内存马后需设置 ai_confirmed=True 再次调用才会执行移除。
Args: class_name: 要移除的内存马完整类名 pid: 目标 Java 进程的 PID tools_dir: 检测工具 jar 包所在目录 ai_confirmed: AI 是否已确认该类为内存马(首次调用设为 False) use_ssh: 是否通过 SSH 在远程服务器执行 ssh_host/ssh_username/ssh_password/ssh_key_path/ssh_port: SSH 连接参数
Returns: 首次调用返回反编译源码供分析,确认后返回移除结果
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| class_name | Yes | ||
| pid | Yes | ||
| tools_dir | No | ||
| ai_confirmed | No | ||
| use_ssh | No | ||
| ssh_host | No | ||
| ssh_username | No | ||
| ssh_password | No | ||
| ssh_key_path | No | ||
| ssh_port | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/memory_shell_mcp/__init__.py:631-718 (handler)The main handler function for the remove_memory_shell tool. Uses Java CLI jar to remove a memory shell class from a JVM process. Has a two-step safety mechanism: first call (ai_confirmed=False) shows decompiled source code for AI analysis, second call (ai_confirmed=True) executes the actual removal via 'java -jar cli.jar -r class_name -p pid'. Supports both local execution and SSH remote execution.
@mcp.tool() def remove_memory_shell( class_name: str, pid: int, tools_dir: Optional[str] = None, ai_confirmed: bool = False, use_ssh: bool = False, ssh_host: Optional[str] = None, ssh_username: Optional[str] = None, ssh_password: Optional[str] = None, ssh_key_path: Optional[str] = None, ssh_port: int = 22 ) -> dict: """ 执行 memory-shell-detector-cli.jar 从 JVM 内存中移除指定的内存马类 底层命令: java -jar memory-shell-detector-cli.jar -r <class_name> -p <pid> 此工具通过 Java Agent 技术从运行中的 JVM 进程中卸载/禁用指定的恶意类, 实现不重启服务的情况下清除内存马。 移除机制: - 对于 Filter/Servlet/Listener:从 Web 容器中注销 - 对于 Spring 组件:从 Spring 容器中移除 Bean - 对于 Agent 类型:尝试还原被 hook 的方法 安全机制:首次调用时会先反编译目标类源码供 AI 分析确认, 确认是内存马后需设置 ai_confirmed=True 再次调用才会执行移除。 Args: class_name: 要移除的内存马完整类名 pid: 目标 Java 进程的 PID tools_dir: 检测工具 jar 包所在目录 ai_confirmed: AI 是否已确认该类为内存马(首次调用设为 False) use_ssh: 是否通过 SSH 在远程服务器执行 ssh_host/ssh_username/ssh_password/ssh_key_path/ssh_port: SSH 连接参数 Returns: 首次调用返回反编译源码供分析,确认后返回移除结果 """ if use_ssh: ssh_host, ssh_username, ssh_password, ssh_key_path, ssh_port = resolve_ssh_params( ssh_host, ssh_username, ssh_password, ssh_key_path, ssh_port ) if not ssh_host or not ssh_username: return {"success": False, "action": "错误", "message": "SSH模式需要提供ssh_host和ssh_username,或设置SSH_HOST和SSH_USERNAME环境变量"} if not tools_dir: tools_dir = os.environ.get("TOOLS_DIR") if not tools_dir: return {"success": False, "action": "错误", "message": "未指定tools_dir,请先调用download_detector_tools或设置TOOLS_DIR环境变量"} cli_jar = os.path.join(tools_dir, "memory-shell-detector-cli.jar") if not use_ssh else f"{tools_dir}/memory-shell-detector-cli.jar" # SSH 连接到远程服务器通常是 Linux,本地执行根据当前系统判断 is_windows = not use_ssh and platform.system().lower() == "windows" escaped_class_name = escape_class_name(class_name, for_windows=is_windows) if not ai_confirmed: view_cmd = f'java -jar "{cli_jar}" -v {escaped_class_name} -p {pid}' if use_ssh: result = execute_ssh_command(host=ssh_host, username=ssh_username, command=view_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port) else: result = execute_local_command(view_cmd) return { "success": True, "action": "需要AI确认", "message": "请分析以下源代码,确认是否为内存马。如果确认是内存马,请再次调用此工具并设置ai_confirmed=True", "source_code": result["stdout"], "class_name": class_name, "pid": pid } remove_cmd = f'echo "y" | java -jar "{cli_jar}" -r {escaped_class_name} -p {pid}' if use_ssh: result = execute_ssh_command(host=ssh_host, username=ssh_username, command=remove_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port) else: result = execute_local_command(remove_cmd) return { "success": result["success"], "action": "移除内存马", "result": result["stdout"], "error": result["stderr"] if not result["success"] else None, "class_name": class_name, "pid": pid } - src/memory_shell_mcp/__init__.py:631-643 (registration)The tool is registered via the @mcp.tool() decorator on the FastMCP instance named 'memory-shell-detector' (line 26). The FastMCP framework automatically registers the function as an MCP tool.
@mcp.tool() def remove_memory_shell( class_name: str, pid: int, tools_dir: Optional[str] = None, ai_confirmed: bool = False, use_ssh: bool = False, ssh_host: Optional[str] = None, ssh_username: Optional[str] = None, ssh_password: Optional[str] = None, ssh_key_path: Optional[str] = None, ssh_port: int = 22 ) -> dict: - Helper function used by remove_memory_shell to escape Java inner class names ($ separator) appropriately for the shell environment (Windows vs Unix).
def escape_class_name(class_name: str, for_windows: bool = False) -> str: """ 转义类名中的特殊字符,防止 shell 解释 Java 内部类使用 $ 分隔符(如 com.example.Outer$Inner), 但 $ 在 Unix shell 中是变量引用符号,需要转义。 Windows cmd/PowerShell 中 $ 不需要转义。 Args: class_name: Java 完整类名 for_windows: 是否为 Windows 系统 Returns: 转义后的类名 """ if for_windows: # Windows cmd 中 $ 不是特殊字符,无需转义 return class_name else: # Unix shell (bash/zsh) 中需要转义 $ return class_name.replace("$", "\\$") - Helper function used by remove_memory_shell to resolve SSH connection parameters, falling back to environment variables if not provided directly.
def resolve_ssh_params( ssh_host: Optional[str], ssh_username: Optional[str], ssh_password: Optional[str], ssh_key_path: Optional[str], ssh_port: int ) -> tuple: """解析SSH参数,优先使用传入值,否则从环境变量读取""" ssh_config = get_ssh_config() return ( ssh_host or ssh_config["host"], ssh_username or ssh_config["username"], ssh_password or ssh_config["password"], ssh_key_path or ssh_config["key_path"], ssh_port if ssh_port != 22 else ssh_config["port"] ) - Helper function used by remove_memory_shell to execute commands locally via subprocess. Used for both the view/decompile step (ai_confirmed=False) and the removal step (ai_confirmed=True).
def execute_local_command(command: str, timeout: int = 300) -> dict: """本地执行命令""" try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "return_code": result.returncode } except subprocess.TimeoutExpired: return { "success": False, "stdout": "", "stderr": f"命令执行超时({timeout}秒)", "return_code": -1 } except Exception as e: return { "success": False, "stdout": "", "stderr": str(e), "return_code": -1 }