Skip to main content
Glama

OPS MCP Server

by Heht571
security_tools.py12.6 kB
"""安全相关工具函数""" import re from core.ssh_manager import SSHManager from core.inspector import ServerInspector from utils.decorators import handle_exceptions from config.logger import logger @handle_exceptions def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 stdin, stdout, stderr = ssh.exec_command(f"ls {log_file}", timeout=timeout) if stderr.read().decode().strip(): # 尝试其他常见的日志文件 alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: stdin, stdout, stderr = ssh.exec_command(f"ls {alt_log}", timeout=timeout) if not stderr.read().decode().strip(): log_file = alt_log break else: result["status"] = "error" result["error"] = f"找不到SSH日志文件: {log_file} 或其他常见日志文件" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def check_firewall_config( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """检查防火墙配置""" result = {"status": "unknown", "firewall_info": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查常见的防火墙服务 firewall_services = ["ufw", "firewalld", "iptables"] firewall_info = {} for fw in firewall_services: if fw == "ufw": stdin, stdout, stderr = ssh.exec_command("ufw status", timeout=timeout) output = stdout.read().decode().strip() if "Status: active" in output: firewall_info["ufw"] = { "active": True, "rules": output.split('\n')[1:] if len(output.split('\n')) > 1 else [] } else: firewall_info["ufw"] = {"active": False} elif fw == "firewalld": stdin, stdout, stderr = ssh.exec_command("firewall-cmd --state", timeout=timeout) output = stdout.read().decode().strip() if output == "running": # 获取区域信息 stdin, stdout, stderr = ssh.exec_command("firewall-cmd --list-all", timeout=timeout) zones_output = stdout.read().decode().strip() firewall_info["firewalld"] = { "active": True, "config": zones_output } else: firewall_info["firewalld"] = {"active": False} elif fw == "iptables": stdin, stdout, stderr = ssh.exec_command("iptables -L", timeout=timeout) output = stdout.read().decode().strip() # 简单检查是否有规则 has_rules = len(output.split('\n')) > 6 # 基本的链定义通常有6行 firewall_info["iptables"] = { "active": has_rules, "rules": output.split('\n') } result["firewall_info"] = firewall_info result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def security_vulnerability_scan( hostname: str, username: str, password: str = "", port: int = 22, scan_type: str = "basic", # basic, full timeout: int = 60 ) -> dict: """执行安全漏洞扫描""" result = {"status": "unknown", "vulnerabilities": [], "summary": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查是否安装了常见的安全扫描工具 scan_tools = ["lynis", "rkhunter", "chkrootkit"] available_tools = [] for tool in scan_tools: stdin, stdout, stderr = ssh.exec_command(f"command -v {tool}", timeout=timeout) if stdout.read().decode().strip(): available_tools.append(tool) if not available_tools: result["status"] = "error" result["error"] = "未找到可用的安全扫描工具,请安装 lynis, rkhunter 或 chkrootkit" return result vulnerabilities = [] summary = {"critical": 0, "high": 0, "medium": 0, "low": 0} # 使用可用的工具进行扫描 for tool in available_tools: if tool == "lynis" and scan_type == "full": stdin, stdout, stderr = ssh.exec_command("lynis audit system", timeout=timeout*2) output = stdout.read().decode().strip() # 解析Lynis输出 for line in output.split('\n'): if "Warning:" in line: severity = "medium" if "Critical" in line: severity = "critical" summary["critical"] += 1 elif "High" in line: severity = "high" summary["high"] += 1 else: summary["medium"] += 1 vulnerabilities.append({ "tool": "lynis", "severity": severity, "description": line.strip() }) elif tool == "rkhunter": stdin, stdout, stderr = ssh.exec_command("rkhunter --check --skip-keypress", timeout=timeout*2) output = stdout.read().decode().strip() # 解析rkhunter输出 for line in output.split('\n'): if "Warning:" in line or "[Warning]" in line: summary["high"] += 1 vulnerabilities.append({ "tool": "rkhunter", "severity": "high", "description": line.strip() }) elif tool == "chkrootkit": stdin, stdout, stderr = ssh.exec_command("chkrootkit", timeout=timeout*2) output = stdout.read().decode().strip() # 解析chkrootkit输出 for line in output.split('\n'): if "INFECTED" in line: summary["critical"] += 1 vulnerabilities.append({ "tool": "chkrootkit", "severity": "critical", "description": line.strip() }) # 如果没有专业工具,执行基本的安全检查 if not vulnerabilities: # 检查SSH配置 stdin, stdout, stderr = ssh.exec_command("grep PermitRootLogin /etc/ssh/sshd_config", timeout=timeout) output = stdout.read().decode().strip() if "yes" in output: summary["high"] += 1 vulnerabilities.append({ "tool": "basic_check", "severity": "high", "description": "SSH允许root直接登录,存在安全风险" }) # 检查密码策略 stdin, stdout, stderr = ssh.exec_command("grep PASS_MAX_DAYS /etc/login.defs", timeout=timeout) output = stdout.read().decode().strip() if output and int(output.split()[-1]) > 90: summary["medium"] += 1 vulnerabilities.append({ "tool": "basic_check", "severity": "medium", "description": f"密码最长使用天数过长: {output.split()[-1]}天" }) result["vulnerabilities"] = vulnerabilities result["summary"] = summary result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def backup_critical_files( hostname: str, username: str, password: str = "", port: int = 22, backup_dir: str = "/tmp/backup", files_to_backup: list[str] = ["/etc/passwd", "/etc/shadow", "/etc/ssh/sshd_config"], timeout: int = 30 ) -> dict: """备份关键系统文件""" result = {"status": "unknown", "backup_results": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 创建备份目录 stdin, stdout, stderr = ssh.exec_command(f"mkdir -p {backup_dir}", timeout=timeout) error = stderr.read().decode().strip() if error: result["status"] = "error" result["error"] = f"创建备份目录失败: {error}" return result backup_results = [] for file_path in files_to_backup: # 检查文件是否存在 stdin, stdout, stderr = ssh.exec_command(f"ls {file_path}", timeout=timeout) error = stderr.read().decode().strip() if error: backup_results.append({ "file": file_path, "status": "error", "message": f"文件不存在: {error}" }) continue # 获取文件名 file_name = file_path.split('/')[-1] backup_path = f"{backup_dir}/{file_name}.bak" # 备份文件 stdin, stdout, stderr = ssh.exec_command(f"cp {file_path} {backup_path}", timeout=timeout) error = stderr.read().decode().strip() if error: backup_results.append({ "file": file_path, "status": "error", "message": f"备份失败: {error}" }) else: backup_results.append({ "file": file_path, "status": "success", "backup_path": backup_path }) result["backup_results"] = backup_results result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server