check_ssh_risk_logins
Analyze SSH login logs to detect failed attempts and suspicious IP addresses, identifying potential security threats on remote servers.
Instructions
检查SSH登录风险,包括失败尝试和可疑IP
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| log_file | No | /var/log/auth.log | |
| threshold | No | ||
| timeout | No |
Implementation Reference
- Primary handler implementation for the check_ssh_risk_logins tool. Connects via SSH, locates auth log, parses SSH login attempts, identifies suspicious IPs based on failure threshold.def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 file_check = f"[ -f {log_file} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) file_exists = stdout.read().decode().strip() == "exists" # 如果主日志不存在,尝试备用日志文件 if not file_exists: alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: file_check = f"[ -f {alt_log} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) if stdout.read().decode().strip() == "exists": log_file = alt_log file_exists = True break if not file_exists: result["error"] = "找不到SSH日志文件" result["status"] = "error" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- SSE variant handler for check_ssh_risk_logins tool, similar logic for SSH login risk assessment.def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 stdin, stdout, stderr = ssh.exec_command(f"ls {log_file}", timeout=timeout) if stderr.read().decode().strip(): # 尝试其他常见的日志文件 alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: stdin, stdout, stderr = ssh.exec_command(f"ls {alt_log}", timeout=timeout) if not stderr.read().decode().strip(): log_file = alt_log break else: result["status"] = "error" result["error"] = f"找不到SSH日志文件: {log_file} 或其他常见日志文件" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- server_monitor/main.py:42-61 (registration)Tool registration in MCP server: imports the function and registers it dynamically with mcp.tool() decorator.tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health }
- server_monitor_sse/server.py:127-141 (registration)Dispatch registration in SSE server: handles tool calls via if-elif and invokes check_ssh_risk_logins.elif name == "check_ssh_risk_logins": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = check_ssh_risk_logins( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), log_file=arguments.get("log_file", "/var/log/auth.log"), threshold=arguments.get("threshold", 5), timeout=arguments.get("timeout", 30) )
- JSON schema definition for the tool's input parameters used in tool listing.{"name": "check_ssh_risk_logins", "description": "检查SSH登录风险,包括失败尝试和可疑IP", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "log_file", "type": "str", "default": "/var/log/auth.log"}, {"name": "threshold", "type": "int", "default": 5}, {"name": "timeout", "type": "int", "default": 30} ]},