check_ssh_risk_logins
Analyze SSH login risks by monitoring failed attempts and suspicious IPs using specified thresholds and log files for enhanced server security.
Instructions
检查SSH登录风险,包括失败尝试和可疑IP
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| log_file | No | /var/log/auth.log | |
| password | No | ||
| port | No | ||
| threshold | No | ||
| timeout | No | ||
| username | Yes |
Input Schema (JSON Schema)
{
"properties": {
"hostname": {
"title": "Hostname",
"type": "string"
},
"log_file": {
"default": "/var/log/auth.log",
"title": "Log File",
"type": "string"
},
"password": {
"default": "",
"title": "Password",
"type": "string"
},
"port": {
"default": 22,
"title": "Port",
"type": "integer"
},
"threshold": {
"default": 5,
"title": "Threshold",
"type": "integer"
},
"timeout": {
"default": 30,
"title": "Timeout",
"type": "integer"
},
"username": {
"title": "Username",
"type": "string"
}
},
"required": [
"hostname",
"username"
],
"title": "check_ssh_risk_loginsArguments",
"type": "object"
}
Implementation Reference
- Core handler function for check_ssh_risk_logins tool. Connects via SSH, checks SSH auth logs for failed logins, parses them using ServerInspector, identifies suspicious IPs exceeding threshold, and returns structured risk assessment.@handle_exceptions def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 file_check = f"[ -f {log_file} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) file_exists = stdout.read().decode().strip() == "exists" # 如果主日志不存在,尝试备用日志文件 if not file_exists: alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: file_check = f"[ -f {alt_log} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) if stdout.read().decode().strip() == "exists": log_file = alt_log file_exists = True break if not file_exists: result["error"] = "找不到SSH日志文件" result["status"] = "error" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- SSE variant core handler for check_ssh_risk_logins tool. Similar logic: SSH connection, log file detection via ls/stderr, parsing auth logs, suspicious IP detection.@handle_exceptions def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 stdin, stdout, stderr = ssh.exec_command(f"ls {log_file}", timeout=timeout) if stderr.read().decode().strip(): # 尝试其他常见的日志文件 alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: stdin, stdout, stderr = ssh.exec_command(f"ls {alt_log}", timeout=timeout) if not stderr.read().decode().strip(): log_file = alt_log break else: result["status"] = "error" result["error"] = f"找不到SSH日志文件: {log_file} 或其他常见日志文件" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- Tool schema definition including name, description, and parameter types/defaults for input validation.{"name": "check_ssh_risk_logins", "description": "检查SSH登录风险,包括失败尝试和可疑IP", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "log_file", "type": "str", "default": "/var/log/auth.log"}, {"name": "threshold", "type": "int", "default": 5}, {"name": "timeout", "type": "int", "default": 30} ]},
- SSE variant tool schema definition for check_ssh_risk_logins.{"name": "check_ssh_risk_logins", "description": "检查SSH登录风险,包括失败尝试和可疑IP", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "log_file", "type": "str", "default": "/var/log/auth.log"}, {"name": "threshold", "type": "int", "default": 5}, {"name": "timeout", "type": "int", "default": 30} ]},