security_vulnerability_scan
Perform basic security vulnerability scans on servers to identify potential weaknesses in network interfaces, services, and firewall configurations.
Instructions
执行基础安全漏洞扫描
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| scan_type | No | basic | |
| timeout | No |
Implementation Reference
- Core handler function implementing basic security vulnerability scan via SSH: checks password policies, SSH config, sudo permissions, empty passwords, and available security updates.def security_vulnerability_scan( hostname: str, username: str, password: str = "", port: int = 22, scan_type: str = "basic", # basic, sshd, packages timeout: int = 60 ) -> dict: """执行基础安全漏洞扫描""" result = {"status": "unknown", "vulnerabilities": [], "summary": "", "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: vulnerabilities = [] # 基础安全检查 if scan_type == "basic" or scan_type == "all": # 检查密码策略 passwd_check = "grep -E '^PASS_MAX_DAYS|^PASS_MIN_DAYS|^PASS_WARN_AGE' /etc/login.defs" stdin, stdout, stderr = ssh.exec_command(passwd_check, timeout=timeout) passwd_policy = stdout.read().decode().strip() # 检查是否存在空密码账户 empty_passwd = "grep -E '^[^:]+::' /etc/shadow" stdin, stdout, stderr = ssh.exec_command(empty_passwd, timeout=timeout) empty_passwd_accounts = stdout.read().decode().strip() if empty_passwd_accounts: vulnerabilities.append({ "type": "security_issue", "level": "critical", "description": "存在空密码账户", "details": empty_passwd_accounts, "recommendation": "为所有账户设置强密码" }) # 检查sudo权限 sudo_check = "grep -E '^[^#].*ALL=\\(ALL\\)' /etc/sudoers /etc/sudoers.d/* 2>/dev/null || true" stdin, stdout, stderr = ssh.exec_command(sudo_check, timeout=timeout) sudo_all = stdout.read().decode().strip() if sudo_all and "NOPASSWD" in sudo_all: vulnerabilities.append({ "type": "security_issue", "level": "high", "description": "存在无需密码的sudo权限", "details": sudo_all, "recommendation": "移除NOPASSWD选项,要求输入密码" }) # SSH配置检查 if scan_type == "sshd" or scan_type == "all": # 检查SSH密码认证是否启用 sshd_check = "grep -E '^PasswordAuthentication|^PermitRootLogin|^PermitEmptyPasswords|^X11Forwarding' /etc/ssh/sshd_config" stdin, stdout, stderr = ssh.exec_command(sshd_check, timeout=timeout) sshd_config = stdout.read().decode().strip() if "PermitRootLogin yes" in sshd_config: vulnerabilities.append({ "type": "security_issue", "level": "high", "description": "允许SSH直接登录root账户", "details": "PermitRootLogin yes", "recommendation": "设置 PermitRootLogin no 并使用普通用户登录后切换到root" }) if "PasswordAuthentication yes" in sshd_config: vulnerabilities.append({ "type": "security_issue", "level": "medium", "description": "SSH密码认证已启用", "details": "PasswordAuthentication yes", "recommendation": "考虑使用密钥认证替代密码认证" }) if "PermitEmptyPasswords yes" in sshd_config: vulnerabilities.append({ "type": "security_issue", "level": "critical", "description": "SSH允许空密码登录", "details": "PermitEmptyPasswords yes", "recommendation": "设置 PermitEmptyPasswords no" }) # 软件包安全检查 if scan_type == "packages" or scan_type == "all": # 检查系统更新状态 stdin, stdout, stderr = ssh.exec_command("which apt-get && echo found || echo not found", timeout=timeout) has_apt = stdout.read().decode().strip() == "found" stdin, stdout, stderr = ssh.exec_command("which yum && echo found || echo not found", timeout=timeout) has_yum = stdout.read().decode().strip() == "found" if has_apt: # Debian/Ubuntu系统 updates_check = "apt-get --simulate upgrade | grep -i 'security'" stdin, stdout, stderr = ssh.exec_command(updates_check, timeout=timeout) security_updates = stdout.read().decode().strip() if security_updates: vulnerabilities.append({ "type": "security_issue", "level": "high", "description": "有可用的安全更新未安装", "details": security_updates[:200] + ("..." if len(security_updates) > 200 else ""), "recommendation": "运行 apt-get upgrade 安装更新" }) elif has_yum: # CentOS/RHEL系统 updates_check = "yum check-update --security" stdin, stdout, stderr = ssh.exec_command(updates_check, timeout=timeout) security_updates = stdout.read().decode().strip() if "needed for security" in security_updates.lower(): vulnerabilities.append({ "type": "security_issue", "level": "high", "description": "有可用的安全更新未安装", "details": security_updates[:200] + ("..." if len(security_updates) > 200 else ""), "recommendation": "运行 yum update --security 安装更新" }) # 填充结果 result["vulnerabilities"] = vulnerabilities # 生成摘要 severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in vulnerabilities: if "level" in vuln and vuln["level"] in severity_counts: severity_counts[vuln["level"]] += 1 total_vulns = sum(severity_counts.values()) if total_vulns == 0: result["summary"] = "未发现安全漏洞。" else: result["summary"] = f"发现 {total_vulns} 个安全问题: " for level, count in severity_counts.items(): if count > 0: result["summary"] += f"{count} 个{level}级, " result["summary"] = result["summary"].rstrip(", ") result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- Core handler function for SSE variant: prefers advanced scanners (lynis, rkhunter, chkrootkit) if available on target, falls back to basic SSH/root login and password policy checks.def security_vulnerability_scan( hostname: str, username: str, password: str = "", port: int = 22, scan_type: str = "basic", # basic, full timeout: int = 60 ) -> dict: """执行安全漏洞扫描""" result = {"status": "unknown", "vulnerabilities": [], "summary": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查是否安装了常见的安全扫描工具 scan_tools = ["lynis", "rkhunter", "chkrootkit"] available_tools = [] for tool in scan_tools: stdin, stdout, stderr = ssh.exec_command(f"command -v {tool}", timeout=timeout) if stdout.read().decode().strip(): available_tools.append(tool) if not available_tools: result["status"] = "error" result["error"] = "未找到可用的安全扫描工具,请安装 lynis, rkhunter 或 chkrootkit" return result vulnerabilities = [] summary = {"critical": 0, "high": 0, "medium": 0, "low": 0} # 使用可用的工具进行扫描 for tool in available_tools: if tool == "lynis" and scan_type == "full": stdin, stdout, stderr = ssh.exec_command("lynis audit system", timeout=timeout*2) output = stdout.read().decode().strip() # 解析Lynis输出 for line in output.split('\n'): if "Warning:" in line: severity = "medium" if "Critical" in line: severity = "critical" summary["critical"] += 1 elif "High" in line: severity = "high" summary["high"] += 1 else: summary["medium"] += 1 vulnerabilities.append({ "tool": "lynis", "severity": severity, "description": line.strip() }) elif tool == "rkhunter": stdin, stdout, stderr = ssh.exec_command("rkhunter --check --skip-keypress", timeout=timeout*2) output = stdout.read().decode().strip() # 解析rkhunter输出 for line in output.split('\n'): if "Warning:" in line or "[Warning]" in line: summary["high"] += 1 vulnerabilities.append({ "tool": "rkhunter", "severity": "high", "description": line.strip() }) elif tool == "chkrootkit": stdin, stdout, stderr = ssh.exec_command("chkrootkit", timeout=timeout*2) output = stdout.read().decode().strip() # 解析chkrootkit输出 for line in output.split('\n'): if "INFECTED" in line: summary["critical"] += 1 vulnerabilities.append({ "tool": "chkrootkit", "severity": "critical", "description": line.strip() }) # 如果没有专业工具,执行基本的安全检查 if not vulnerabilities: # 检查SSH配置 stdin, stdout, stderr = ssh.exec_command("grep PermitRootLogin /etc/ssh/sshd_config", timeout=timeout) output = stdout.read().decode().strip() if "yes" in output: summary["high"] += 1 vulnerabilities.append({ "tool": "basic_check", "severity": "high", "description": "SSH允许root直接登录,存在安全风险" }) # 检查密码策略 stdin, stdout, stderr = ssh.exec_command("grep PASS_MAX_DAYS /etc/login.defs", timeout=timeout) output = stdout.read().decode().strip() if output and int(output.split()[-1]) > 90: summary["medium"] += 1 vulnerabilities.append({ "tool": "basic_check", "severity": "medium", "description": f"密码最长使用天数过长: {output.split()[-1]}天" }) result["vulnerabilities"] = vulnerabilities result["summary"] = summary result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- server_monitor/main.py:51-51 (registration)Tool registration in the main MCP server dictionary, imported from tools and registered via loop with mcp.tool() decorator.'security_vulnerability_scan': security_vulnerability_scan,
- server_monitor_sse/server.py:157-170 (registration)Dispatch logic in tool_handler for SSE server, mapping tool name to function call with argument validation.elif name == "security_vulnerability_scan": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = security_vulnerability_scan( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), scan_type=arguments.get("scan_type", "basic"), timeout=arguments.get("timeout", 60) )
- Tool schema definition including input parameters and descriptions used for listing available tools.{"name": "security_vulnerability_scan", "description": "执行基础安全漏洞扫描", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "scan_type", "type": "str", "default": "basic"}, {"name": "timeout", "type": "int", "default": 60} ]},