Skip to main content
Glama

check_ssh_risk_logins

Analyze SSH login logs to detect failed attempts and suspicious IP addresses, identifying potential security threats on remote servers.

Instructions

检查SSH登录风险,包括失败尝试和可疑IP

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hostnameYes
usernameYes
passwordNo
portNo
log_fileNo/var/log/auth.log
thresholdNo
timeoutNo

Implementation Reference

  • Primary handler implementation for the check_ssh_risk_logins tool. Connects via SSH, locates auth log, parses SSH login attempts, identifies suspicious IPs based on failure threshold.
    def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 file_check = f"[ -f {log_file} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) file_exists = stdout.read().decode().strip() == "exists" # 如果主日志不存在,尝试备用日志文件 if not file_exists: alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: file_check = f"[ -f {alt_log} ] && echo 'exists' || echo 'not found'" stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout) if stdout.read().decode().strip() == "exists": log_file = alt_log file_exists = True break if not file_exists: result["error"] = "找不到SSH日志文件" result["status"] = "error" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
  • SSE variant handler for check_ssh_risk_logins tool, similar logic for SSH login risk assessment.
    def check_ssh_risk_logins( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/auth.log", threshold: int = 5, timeout: int = 30 ) -> dict: """检查SSH登录风险,包括失败尝试和可疑IP""" result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查日志文件是否存在 stdin, stdout, stderr = ssh.exec_command(f"ls {log_file}", timeout=timeout) if stderr.read().decode().strip(): # 尝试其他常见的日志文件 alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"] for alt_log in alternative_logs: stdin, stdout, stderr = ssh.exec_command(f"ls {alt_log}", timeout=timeout) if not stderr.read().decode().strip(): log_file = alt_log break else: result["status"] = "error" result["error"] = f"找不到SSH日志文件: {log_file} 或其他常见日志文件" return result # 获取日志内容 log_command = f"grep 'sshd' {log_file} | tail -n 1000" stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout) log_content = stdout.read().decode().strip() # 解析日志 failed_logins, success_logins = ServerInspector.parse_auth_log(log_content) # 找出超过阈值的可疑IP suspicious_ips = [ {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"} for ip, count in failed_logins.items() if count >= threshold ] # 按尝试次数排序 suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True) result["suspicious_ips"] = suspicious_ips result["failed_logins"] = failed_logins result["success_logins"] = success_logins result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
  • Tool registration in MCP server: imports the function and registers it dynamically with mcp.tool() decorator.
    tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health }
  • Dispatch registration in SSE server: handles tool calls via if-elif and invokes check_ssh_risk_logins.
    elif name == "check_ssh_risk_logins": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = check_ssh_risk_logins( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), log_file=arguments.get("log_file", "/var/log/auth.log"), threshold=arguments.get("threshold", 5), timeout=arguments.get("timeout", 30) )
  • JSON schema definition for the tool's input parameters used in tool listing.
    {"name": "check_ssh_risk_logins", "description": "检查SSH登录风险,包括失败尝试和可疑IP", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "log_file", "type": "str", "default": "/var/log/auth.log"}, {"name": "threshold", "type": "int", "default": 5}, {"name": "timeout", "type": "int", "default": 30} ]},

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server