Skip to main content
Glama
Heht571
by Heht571

check_ssh_risk_logins

Analyze SSH login logs to detect failed attempts and suspicious IP addresses, identifying potential security threats on remote servers.

Instructions

检查SSH登录风险,包括失败尝试和可疑IP

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hostnameYes
usernameYes
passwordNo
portNo
log_fileNo/var/log/auth.log
thresholdNo
timeoutNo

Implementation Reference

  • Primary handler implementation for the check_ssh_risk_logins tool. Connects via SSH, locates auth log, parses SSH login attempts, identifies suspicious IPs based on failure threshold.
    def check_ssh_risk_logins(
        hostname: str,
        username: str,
        password: str = "",
        port: int = 22,
        log_file: str = "/var/log/auth.log",
        threshold: int = 5,
        timeout: int = 30
    ) -> dict:
        """检查SSH登录风险,包括失败尝试和可疑IP"""
        result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""}
    
        try:
            with SSHManager(hostname, username, password, port, timeout) as ssh:
                # 检查日志文件是否存在
                file_check = f"[ -f {log_file} ] && echo 'exists' || echo 'not found'"
                stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout)
                file_exists = stdout.read().decode().strip() == "exists"
    
                # 如果主日志不存在,尝试备用日志文件
                if not file_exists:
                    alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"]
                    for alt_log in alternative_logs:
                        file_check = f"[ -f {alt_log} ] && echo 'exists' || echo 'not found'"
                        stdin, stdout, stderr = ssh.exec_command(file_check, timeout=timeout)
                        if stdout.read().decode().strip() == "exists":
                            log_file = alt_log
                            file_exists = True
                            break
    
                if not file_exists:
                    result["error"] = "找不到SSH日志文件"
                    result["status"] = "error"
                    return result
    
                # 获取日志内容
                log_command = f"grep 'sshd' {log_file} | tail -n 1000"
                stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout)
                log_content = stdout.read().decode().strip()
    
                # 解析日志
                failed_logins, success_logins = ServerInspector.parse_auth_log(log_content)
    
                # 找出超过阈值的可疑IP
                suspicious_ips = [
                    {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"}
                    for ip, count in failed_logins.items()
                    if count >= threshold
                ]
    
                # 按尝试次数排序
                suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True)
    
                result["suspicious_ips"] = suspicious_ips
                result["failed_logins"] = failed_logins
                result["success_logins"] = success_logins
                result["status"] = "success"
    
        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)
    
        return result
  • SSE variant handler for check_ssh_risk_logins tool, similar logic for SSH login risk assessment.
    def check_ssh_risk_logins(
        hostname: str,
        username: str,
        password: str = "",
        port: int = 22,
        log_file: str = "/var/log/auth.log",
        threshold: int = 5,
        timeout: int = 30
    ) -> dict:
        """检查SSH登录风险,包括失败尝试和可疑IP"""
        result = {"status": "unknown", "suspicious_ips": [], "failed_logins": {}, "success_logins": [], "error": ""}
    
        try:
            with SSHManager(hostname, username, password, port, timeout) as ssh:
                # 检查日志文件是否存在
                stdin, stdout, stderr = ssh.exec_command(f"ls {log_file}", timeout=timeout)
                if stderr.read().decode().strip():
                    # 尝试其他常见的日志文件
                    alternative_logs = ["/var/log/secure", "/var/log/audit/audit.log"]
                    for alt_log in alternative_logs:
                        stdin, stdout, stderr = ssh.exec_command(f"ls {alt_log}", timeout=timeout)
                        if not stderr.read().decode().strip():
                            log_file = alt_log
                            break
                    else:
                        result["status"] = "error"
                        result["error"] = f"找不到SSH日志文件: {log_file} 或其他常见日志文件"
                        return result
    
                # 获取日志内容
                log_command = f"grep 'sshd' {log_file} | tail -n 1000"
                stdin, stdout, stderr = ssh.exec_command(log_command, timeout=timeout)
                log_content = stdout.read().decode().strip()
    
                # 解析日志
                failed_logins, success_logins = ServerInspector.parse_auth_log(log_content)
    
                # 找出超过阈值的可疑IP
                suspicious_ips = [
                    {"ip": ip, "attempts": count, "risk_level": "high" if count > threshold * 2 else "medium"}
                    for ip, count in failed_logins.items()
                    if count >= threshold
                ]
    
                # 按尝试次数排序
                suspicious_ips.sort(key=lambda x: x["attempts"], reverse=True)
    
                result["suspicious_ips"] = suspicious_ips
                result["failed_logins"] = failed_logins
                result["success_logins"] = success_logins
                result["status"] = "success"
    
        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)
    
        return result
  • Tool registration in MCP server: imports the function and registers it dynamically with mcp.tool() decorator.
    tools_dict = {
        'get_memory_info': get_memory_info,
        'remote_server_inspection': remote_server_inspection,
        'get_system_load': get_system_load,
        'monitor_processes': monitor_processes,
        'check_service_status': check_service_status,
        'get_os_details': get_os_details,
        'check_ssh_risk_logins': check_ssh_risk_logins,
        'check_firewall_config': check_firewall_config,
        'security_vulnerability_scan': security_vulnerability_scan,
        'backup_critical_files': backup_critical_files,
        'inspect_network': inspect_network,
        'analyze_logs': analyze_logs,
        'list_docker_containers': list_docker_containers,
        'list_docker_images': list_docker_images,
        'list_docker_volumes': list_docker_volumes,
        'get_container_logs': get_container_logs,
        'monitor_container_stats': monitor_container_stats,
        'check_docker_health': check_docker_health
    }
  • Dispatch registration in SSE server: handles tool calls via if-elif and invokes check_ssh_risk_logins.
    elif name == "check_ssh_risk_logins":
        required_args = ["hostname", "username"]
        for arg in required_args:
            if arg not in arguments:
                raise ValueError(f"Missing required argument '{arg}'")
    
        result = check_ssh_risk_logins(
            hostname=arguments["hostname"],
            username=arguments["username"],
            password=arguments.get("password", ""),
            port=arguments.get("port", 22),
            log_file=arguments.get("log_file", "/var/log/auth.log"),
            threshold=arguments.get("threshold", 5),
            timeout=arguments.get("timeout", 30)
        )
  • JSON schema definition for the tool's input parameters used in tool listing.
    {"name": "check_ssh_risk_logins", "description": "检查SSH登录风险,包括失败尝试和可疑IP", "parameters": [
        {"name": "hostname", "type": "str", "default": None},
        {"name": "username", "type": "str", "default": None},
        {"name": "password", "type": "str", "default": ""},
        {"name": "port", "type": "int", "default": 22},
        {"name": "log_file", "type": "str", "default": "/var/log/auth.log"},
        {"name": "threshold", "type": "int", "default": 5},
        {"name": "timeout", "type": "int", "default": 30}
    ]},
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions checking '失败尝试和可疑IP' (failed attempts and suspicious IPs), which hints at analysis behavior, but doesn't describe what the tool actually does (e.g., reads logs, analyzes patterns, returns risk scores), potential side effects (e.g., network connections, log parsing), authentication needs (implied by SSH parameters but not stated), or output format. For a tool with 7 parameters and no annotations, this is a significant gap.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence in Chinese: '检查SSH登录风险,包括失败尝试和可疑IP'. It's front-loaded with the core purpose and includes key elements (SSH, risks, failed attempts, suspicious IPs) without waste. Every word earns its place, making it highly concise and well-structured.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (7 parameters, no annotations, no output schema), the description is incomplete. It doesn't explain the tool's behavior, parameter usage, or output, leaving gaps in understanding how to invoke it correctly. For a security analysis tool with multiple inputs, more detail is needed to be complete, especially without annotations or output schema to fill in the gaps.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must compensate for undocumented parameters. It adds no meaning beyond the schema—it doesn't explain what 'hostname', 'username', 'password', etc., are used for (e.g., SSH connection details vs. log analysis), how 'threshold' relates to risk detection, or what 'log_file' defaults to. With 7 parameters and no schema descriptions, the description fails to provide necessary context, scoring below the baseline of 3.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: '检查SSH登录风险,包括失败尝试和可疑IP' (Check SSH login risks, including failed attempts and suspicious IPs). It specifies the verb ('检查' - check) and resource ('SSH登录风险' - SSH login risks), and distinguishes it from siblings like 'analyze_logs' or 'security_vulnerability_scan' by focusing specifically on SSH. However, it doesn't explicitly differentiate from all siblings (e.g., 'check_firewall_config' might overlap in security context), so it's not a perfect 5.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It doesn't mention prerequisites (e.g., SSH access, log file availability), exclusions (e.g., not for non-SSH logs), or compare it to siblings like 'analyze_logs' (general log analysis) or 'security_vulnerability_scan' (broader security checks). Usage is implied by the name and description but not explicitly stated.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server