Skip to main content
Glama
Heht571
by Heht571

analyze_logs

Analyzes server log files to identify errors, warnings, and critical issues using customizable search patterns for troubleshooting.

Instructions

分析服务器日志文件中的错误和警告

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hostnameYes
usernameYes
passwordNo
portNo
log_fileNo/var/log/syslog
patternNoerror|fail|critical
linesNo
timeoutNo

Implementation Reference

  • Primary handler function for the 'analyze_logs' tool. Connects to remote server via SSH, retrieves recent log lines, filters for error patterns, parses and categorizes log entries by level (critical, error, warning, fail, other), and returns a structured summary with counts and entries.
    @handle_exceptions
    def analyze_logs(
        hostname: str,
        username: str,
        password: str = "",
        port: int = 22,
        log_file: str = "/var/log/syslog",
        pattern: str = "error|fail|critical",
        lines: int = 100,
        timeout: int = 30
    ) -> dict:
        """分析服务器日志文件中的错误和警告"""
        result = {"status": "unknown", "entries": [], "summary": {}, "error": ""}
    
        try:
            with SSHManager(hostname, username, password, port, timeout) as ssh:
                # 获取日志的最后几行
                tail_command = f"tail -n {lines} {log_file}"
                stdin, stdout, stderr = ssh.exec_command(tail_command, timeout=timeout)
                log_output = stdout.read().decode().strip()
    
                if not log_output:
                    result["error"] = f"无法读取日志文件 {log_file}"
                    result["status"] = "error"
                    return result
    
                # 使用grep过滤包含指定模式的行
                grep_command = f"echo '{log_output}' | grep -E '{pattern}'"
                stdin, stdout, stderr = ssh.exec_command(grep_command, timeout=timeout)
                matched_output = stdout.read().decode().strip()
    
                # 初始化计数器
                pattern_counts = {
                    "critical": 0,
                    "error": 0,
                    "warning": 0,
                    "fail": 0,
                    "other": 0
                }
    
                entries = []
    
                for line in matched_output.split('\n'):
                    if not line:
                        continue
    
                    # 尝试提取时间戳
                    timestamp = ""
                    try:
                        # 假设日志的前部分是时间戳
                        timestamp_part = ' '.join(line.split()[:3])
                        timestamp = timestamp_part
                    except:
                        pass
    
                    # 确定日志级别
                    level = "other"
                    line_lower = line.lower()
                    if "critical" in line_lower:
                        level = "critical"
                        pattern_counts["critical"] += 1
                    elif "error" in line_lower:
                        level = "error"
                        pattern_counts["error"] += 1
                    elif "warning" in line_lower or "warn" in line_lower:
                        level = "warning"
                        pattern_counts["warning"] += 1
                    elif "fail" in line_lower:
                        level = "fail"
                        pattern_counts["fail"] += 1
                    else:
                        pattern_counts["other"] += 1
    
                    entries.append({
                        "timestamp": timestamp,
                        "level": level,
                        "message": line
                    })
    
                result["entries"] = entries
                result["summary"] = {
                    "total_entries": len(entries),
                    "counts_by_level": pattern_counts
                }
    
                result["status"] = "success"
    
        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)
    
        return result
  • Secondary handler function for the 'analyze_logs' tool in the non-SSE codebase. Nearly identical logic to the SSE version.
    @handle_exceptions
    def analyze_logs(
        hostname: str,
        username: str,
        password: str = "",
        port: int = 22,
        log_file: str = "/var/log/syslog",
        pattern: str = "error|fail|critical",
        lines: int = 100,
        timeout: int = 30
    ) -> dict:
        """分析服务器日志文件中的错误和警告"""
        result = {"status": "unknown", "entries": [], "summary": {}, "error": ""}
    
        try:
            with SSHManager(hostname, username, password, port, timeout) as ssh:
                # 获取日志的最后几行
                tail_command = f"tail -n {lines} {log_file}"
                stdin, stdout, stderr = ssh.exec_command(tail_command, timeout=timeout)
                log_output = stdout.read().decode().strip()
    
                if not log_output:
                    result["error"] = f"无法读取日志文件 {log_file}"
                    result["status"] = "error"
                    return result
    
                # 搜索匹配的日志条目
                grep_command = f"grep -E '{pattern}' <<< '{log_output}'"
                stdin, stdout, stderr = ssh.exec_command(grep_command, timeout=timeout)
                matched_output = stdout.read().decode().strip()
    
                # 解析匹配的日志条目
                entries = []
                pattern_counts = {"error": 0, "warning": 0, "critical": 0, "fail": 0, "other": 0}
    
                for line in matched_output.split('\n'):
                    if not line:
                        continue
    
                    # 尝试提取时间戳
                    timestamp = ""
                    try:
                        # 假设日志的前部分是时间戳
                        timestamp_part = ' '.join(line.split()[:3])
                        timestamp = timestamp_part
                    except:
                        pass
    
                    # 确定日志级别
                    level = "other"
                    line_lower = line.lower()
                    if "critical" in line_lower:
                        level = "critical"
                        pattern_counts["critical"] += 1
                    elif "error" in line_lower:
                        level = "error"
                        pattern_counts["error"] += 1
                    elif "warning" in line_lower or "warn" in line_lower:
                        level = "warning"
                        pattern_counts["warning"] += 1
                    elif "fail" in line_lower:
                        level = "fail"
                        pattern_counts["fail"] += 1
                    else:
                        pattern_counts["other"] += 1
    
                    entries.append({
                        "timestamp": timestamp,
                        "level": level,
                        "message": line
                    })
    
                result["entries"] = entries
                result["summary"] = {
                    "total_entries": len(entries),
                    "counts_by_level": pattern_counts
                }
    
                result["status"] = "success"
    
        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)
    
        return result
  • Tool dispatch/registration in the SSE server handler. Maps tool name 'analyze_logs' to the analyze_logs function call with argument validation.
    elif name == "analyze_logs":
        required_args = ["hostname", "username"]
        for arg in required_args:
            if arg not in arguments:
                raise ValueError(f"Missing required argument '{arg}'")
    
        result = analyze_logs(
            hostname=arguments["hostname"],
            username=arguments["username"],
            password=arguments.get("password", ""),
            port=arguments.get("port", 22),
            log_file=arguments.get("log_file", "/var/log/syslog"),
            pattern=arguments.get("pattern", "error|fail|critical"),
            lines=arguments.get("lines", 100),
            timeout=arguments.get("timeout", 30)
        )
  • Tool schema definition for 'analyze_logs', including name, description, and parameter types/defaults used in list_available_tools.
    {"name": "analyze_logs", "description": "分析服务器日志文件中的错误和警告", "parameters": [
        {"name": "hostname", "type": "str", "default": None},
        {"name": "username", "type": "str", "default": None},
        {"name": "password", "type": "str", "default": ""},
        {"name": "port", "type": "int", "default": 22},
        {"name": "log_file", "type": "str", "default": "/var/log/syslog"},
        {"name": "pattern", "type": "str", "default": "error|fail|critical"},
        {"name": "lines", "type": "int", "default": 100},
        {"name": "timeout", "type": "int", "default": 30}
    ]},
  • Tool registration dictionary mapping 'analyze_logs' to the handler function, used to dynamically register tools in FastMCP.
    tools_dict = {
        'get_memory_info': get_memory_info,
        'remote_server_inspection': remote_server_inspection,
        'get_system_load': get_system_load,
        'monitor_processes': monitor_processes,
        'check_service_status': check_service_status,
        'get_os_details': get_os_details,
        'check_ssh_risk_logins': check_ssh_risk_logins,
        'check_firewall_config': check_firewall_config,
        'security_vulnerability_scan': security_vulnerability_scan,
        'backup_critical_files': backup_critical_files,
        'inspect_network': inspect_network,
        'analyze_logs': analyze_logs,
        'list_docker_containers': list_docker_containers,
        'list_docker_images': list_docker_images,
        'list_docker_volumes': list_docker_volumes,
        'get_container_logs': get_container_logs,
        'monitor_container_stats': monitor_container_stats,
        'check_docker_health': check_docker_health
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server