download_detector_tools
Downloads the required Java agent and CLI jar files to initiate memory shell detection and scanning. Supports local and remote SSH deployment.
Instructions
下载 Java 内存马检测工具包(detector-agent.jar 和 detector-cli.jar)
此工具会下载两个核心 jar 包:
detector-agent-1.0.0-SNAPSHOT.jar: Java Agent,用于注入目标 JVM 进程
memory-shell-detector-cli.jar: 命令行工具,提供扫描、反编译、移除等功能
这是使用内存马检测功能的前置步骤,下载完成后才能执行后续的扫描和分析操作。
Args: tools_dir: 工具存放目录,不指定则从环境变量 TOOLS_DIR 读取,都没有则使用系统临时目录 use_ssh: 是否在远程服务器上下载 ssh_host: SSH 主机地址(不指定则从环境变量 SSH_HOST 读取) ssh_username: SSH 用户名(不指定则从环境变量 SSH_USERNAME 读取) ssh_password: SSH 密码(不指定则从环境变量 SSH_PASSWORD 读取) ssh_key_path: SSH 私钥路径(不指定则从环境变量 SSH_KEY_PATH 读取) ssh_port: SSH 端口(不指定则从环境变量 SSH_PORT 读取)
Returns: 下载结果,包含工具目录路径和 jar 文件名
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| tools_dir | No | ||
| use_ssh | No | ||
| ssh_host | No | ||
| ssh_username | No | ||
| ssh_password | No | ||
| ssh_key_path | No | ||
| ssh_port | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/memory_shell_mcp/__init__.py:330-462 (handler)The handler function for the 'download_detector_tools' MCP tool. Decorated with @mcp.tool(), it downloads Java memory shell detector jar files (detector-agent-1.0.0-SNAPSHOT.jar and memory-shell-detector-cli.jar). Supports both local and SSH modes, checks if tools already exist, validates network availability, and downloads from predefined URLs.
@mcp.tool() def download_detector_tools( tools_dir: Optional[str] = None, use_ssh: bool = False, ssh_host: Optional[str] = None, ssh_username: Optional[str] = None, ssh_password: Optional[str] = None, ssh_key_path: Optional[str] = None, ssh_port: int = 22 ) -> dict: """ 下载 Java 内存马检测工具包(detector-agent.jar 和 detector-cli.jar) 此工具会下载两个核心 jar 包: - detector-agent-1.0.0-SNAPSHOT.jar: Java Agent,用于注入目标 JVM 进程 - memory-shell-detector-cli.jar: 命令行工具,提供扫描、反编译、移除等功能 这是使用内存马检测功能的前置步骤,下载完成后才能执行后续的扫描和分析操作。 Args: tools_dir: 工具存放目录,不指定则从环境变量 TOOLS_DIR 读取,都没有则使用系统临时目录 use_ssh: 是否在远程服务器上下载 ssh_host: SSH 主机地址(不指定则从环境变量 SSH_HOST 读取) ssh_username: SSH 用户名(不指定则从环境变量 SSH_USERNAME 读取) ssh_password: SSH 密码(不指定则从环境变量 SSH_PASSWORD 读取) ssh_key_path: SSH 私钥路径(不指定则从环境变量 SSH_KEY_PATH 读取) ssh_port: SSH 端口(不指定则从环境变量 SSH_PORT 读取) Returns: 下载结果,包含工具目录路径和 jar 文件名 """ if use_ssh: ssh_host, ssh_username, ssh_password, ssh_key_path, ssh_port = resolve_ssh_params( ssh_host, ssh_username, ssh_password, ssh_key_path, ssh_port ) if not ssh_host or not ssh_username: return {"success": False, "message": "SSH模式需要提供ssh_host和ssh_username,或设置SSH_HOST和SSH_USERNAME环境变量", "tools_dir": None} if tools_dir: target_dir = tools_dir elif os.environ.get("TOOLS_DIR"): target_dir = os.environ.get("TOOLS_DIR") else: if use_ssh: target_dir = "/tmp/memory-shell-detector" else: target_dir = os.path.join(get_temp_dir(), "memory-shell-detector") agent_jar_name = "detector-agent-1.0.0-SNAPSHOT.jar" cli_jar_name = "memory-shell-detector-cli.jar" if not use_ssh: agent_path = os.path.join(target_dir, agent_jar_name) cli_path = os.path.join(target_dir, cli_jar_name) if os.path.exists(agent_path) and os.path.exists(cli_path): return { "success": True, "message": "工具已存在,无需下载", "tools_dir": target_dir, "agent_jar": agent_jar_name, "cli_jar": cli_jar_name } else: check_cmd = f'test -f "{target_dir}/{agent_jar_name}" && test -f "{target_dir}/{cli_jar_name}" && echo "exists"' result = execute_ssh_command( host=ssh_host, username=ssh_username, command=check_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port ) if "exists" in result["stdout"]: return { "success": True, "message": "工具已存在,无需下载", "tools_dir": target_dir, "agent_jar": agent_jar_name, "cli_jar": cli_jar_name } if not use_ssh: network_check = check_network_available() if not network_check["available"]: return { "success": False, "message": f"网络检测失败: {network_check['message']}", "tools_dir": None } if use_ssh: mkdir_cmd = f"mkdir -p {target_dir}" agent_path = f"{target_dir}/detector-agent-1.0.0-SNAPSHOT.jar" cli_path = f"{target_dir}/memory-shell-detector-cli.jar" download_agent_cmd = f'curl -L -o "{agent_path}" "{DETECTOR_AGENT_URL}" || wget -O "{agent_path}" "{DETECTOR_AGENT_URL}"' download_cli_cmd = f'curl -L -o "{cli_path}" "{DETECTOR_CLI_URL}" || wget -O "{cli_path}" "{DETECTOR_CLI_URL}"' result = execute_ssh_command(host=ssh_host, username=ssh_username, command=mkdir_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port) if not result["success"]: return {"success": False, "message": f"创建目录失败: {result['stderr']}", "tools_dir": None} result = execute_ssh_command(host=ssh_host, username=ssh_username, command=download_agent_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port, timeout=120) if not result["success"]: return {"success": False, "message": f"下载detector-agent失败: {result['stderr']}", "tools_dir": None} result = execute_ssh_command(host=ssh_host, username=ssh_username, command=download_cli_cmd, password=ssh_password, key_path=ssh_key_path, port=ssh_port, timeout=120) if not result["success"]: return {"success": False, "message": f"下载detector-cli失败: {result['stderr']}", "tools_dir": None} else: os.makedirs(target_dir, exist_ok=True) agent_path = os.path.join(target_dir, "detector-agent-1.0.0-SNAPSHOT.jar") cli_path = os.path.join(target_dir, "memory-shell-detector-cli.jar") download_cmd = get_download_command(DETECTOR_AGENT_URL, agent_path) result = execute_local_command(download_cmd, timeout=120) if not result["success"] and not os.path.exists(agent_path): return {"success": False, "message": f"下载detector-agent失败: {result['stderr']}", "tools_dir": None} download_cmd = get_download_command(DETECTOR_CLI_URL, cli_path) result = execute_local_command(download_cmd, timeout=120) if not result["success"] and not os.path.exists(cli_path): return {"success": False, "message": f"下载detector-cli失败: {result['stderr']}", "tools_dir": None} return { "success": True, "message": "工具下载完成", "tools_dir": target_dir, "agent_jar": "detector-agent-1.0.0-SNAPSHOT.jar", "cli_jar": "memory-shell-detector-cli.jar" } - Function signature / input schema for download_detector_tools. Parameters: tools_dir (optional), use_ssh (bool), ssh_host, ssh_username, ssh_password, ssh_key_path, ssh_port. Returns a dict with success, message, tools_dir, agent_jar, cli_jar.
def download_detector_tools( tools_dir: Optional[str] = None, use_ssh: bool = False, ssh_host: Optional[str] = None, ssh_username: Optional[str] = None, ssh_password: Optional[str] = None, ssh_key_path: Optional[str] = None, ssh_port: int = 22 ) -> dict: - src/memory_shell_mcp/__init__.py:330-330 (registration)The '@mcp.tool()' decorator registers download_detector_tools as an MCP tool on the FastMCP instance named 'memory-shell-detector'.
@mcp.tool() - Helper function get_download_command() used by download_detector_tools to construct the OS-appropriate download command (curl/wget for Unix, curl/powershell for Windows).
def get_download_command(url: str, output_path: str) -> str: """根据系统获取下载命令""" system = platform.system().lower() if system == "windows": return f'curl -L -o "{output_path}" "{url}" || powershell -Command "Invoke-WebRequest -Uri \'{url}\' -OutFile \'{output_path}\'"' else: return f'curl -L -o "{output_path}" "{url}" || wget -O "{output_path}" "{url}"' - Helper function check_network_available() used by download_detector_tools to verify network connectivity before attempting downloads.
def check_network_available(test_url: str = "https://xget.xi-xu.me") -> dict: """检测网络是否可用""" system = platform.system().lower() if system == "windows": cmd = f'curl -s -o nul -w "%{{http_code}}" --connect-timeout 10 "{test_url}" || powershell -Command "(Invoke-WebRequest -Uri \'{test_url}\' -TimeoutSec 10 -UseBasicParsing).StatusCode"' else: cmd = f'curl -s -o /dev/null -w "%{{http_code}}" --connect-timeout 10 "{test_url}" 2>/dev/null || wget -q --spider --timeout=10 "{test_url}" && echo "200"' result = execute_local_command(cmd, timeout=30) if result["success"] or "200" in result["stdout"]: return {"available": True, "message": "网络连接正常"} else: return {"available": False, "message": f"网络连接失败: {result['stderr']}"}