analyze_logs
Analyzes server log files to identify errors, warnings, and critical issues using customizable search patterns for troubleshooting.
Instructions
分析服务器日志文件中的错误和警告
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| log_file | No | /var/log/syslog | |
| pattern | No | error|fail|critical | |
| lines | No | ||
| timeout | No |
Implementation Reference
- Primary handler function for the 'analyze_logs' tool. Connects to remote server via SSH, retrieves recent log lines, filters for error patterns, parses and categorizes log entries by level (critical, error, warning, fail, other), and returns a structured summary with counts and entries.@handle_exceptions def analyze_logs( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/syslog", pattern: str = "error|fail|critical", lines: int = 100, timeout: int = 30 ) -> dict: """分析服务器日志文件中的错误和警告""" result = {"status": "unknown", "entries": [], "summary": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 获取日志的最后几行 tail_command = f"tail -n {lines} {log_file}" stdin, stdout, stderr = ssh.exec_command(tail_command, timeout=timeout) log_output = stdout.read().decode().strip() if not log_output: result["error"] = f"无法读取日志文件 {log_file}" result["status"] = "error" return result # 使用grep过滤包含指定模式的行 grep_command = f"echo '{log_output}' | grep -E '{pattern}'" stdin, stdout, stderr = ssh.exec_command(grep_command, timeout=timeout) matched_output = stdout.read().decode().strip() # 初始化计数器 pattern_counts = { "critical": 0, "error": 0, "warning": 0, "fail": 0, "other": 0 } entries = [] for line in matched_output.split('\n'): if not line: continue # 尝试提取时间戳 timestamp = "" try: # 假设日志的前部分是时间戳 timestamp_part = ' '.join(line.split()[:3]) timestamp = timestamp_part except: pass # 确定日志级别 level = "other" line_lower = line.lower() if "critical" in line_lower: level = "critical" pattern_counts["critical"] += 1 elif "error" in line_lower: level = "error" pattern_counts["error"] += 1 elif "warning" in line_lower or "warn" in line_lower: level = "warning" pattern_counts["warning"] += 1 elif "fail" in line_lower: level = "fail" pattern_counts["fail"] += 1 else: pattern_counts["other"] += 1 entries.append({ "timestamp": timestamp, "level": level, "message": line }) result["entries"] = entries result["summary"] = { "total_entries": len(entries), "counts_by_level": pattern_counts } result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- Secondary handler function for the 'analyze_logs' tool in the non-SSE codebase. Nearly identical logic to the SSE version.@handle_exceptions def analyze_logs( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/syslog", pattern: str = "error|fail|critical", lines: int = 100, timeout: int = 30 ) -> dict: """分析服务器日志文件中的错误和警告""" result = {"status": "unknown", "entries": [], "summary": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 获取日志的最后几行 tail_command = f"tail -n {lines} {log_file}" stdin, stdout, stderr = ssh.exec_command(tail_command, timeout=timeout) log_output = stdout.read().decode().strip() if not log_output: result["error"] = f"无法读取日志文件 {log_file}" result["status"] = "error" return result # 搜索匹配的日志条目 grep_command = f"grep -E '{pattern}' <<< '{log_output}'" stdin, stdout, stderr = ssh.exec_command(grep_command, timeout=timeout) matched_output = stdout.read().decode().strip() # 解析匹配的日志条目 entries = [] pattern_counts = {"error": 0, "warning": 0, "critical": 0, "fail": 0, "other": 0} for line in matched_output.split('\n'): if not line: continue # 尝试提取时间戳 timestamp = "" try: # 假设日志的前部分是时间戳 timestamp_part = ' '.join(line.split()[:3]) timestamp = timestamp_part except: pass # 确定日志级别 level = "other" line_lower = line.lower() if "critical" in line_lower: level = "critical" pattern_counts["critical"] += 1 elif "error" in line_lower: level = "error" pattern_counts["error"] += 1 elif "warning" in line_lower or "warn" in line_lower: level = "warning" pattern_counts["warning"] += 1 elif "fail" in line_lower: level = "fail" pattern_counts["fail"] += 1 else: pattern_counts["other"] += 1 entries.append({ "timestamp": timestamp, "level": level, "message": line }) result["entries"] = entries result["summary"] = { "total_entries": len(entries), "counts_by_level": pattern_counts } result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result
- server_monitor_sse/server.py:202-218 (registration)Tool dispatch/registration in the SSE server handler. Maps tool name 'analyze_logs' to the analyze_logs function call with argument validation.elif name == "analyze_logs": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = analyze_logs( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), log_file=arguments.get("log_file", "/var/log/syslog"), pattern=arguments.get("pattern", "error|fail|critical"), lines=arguments.get("lines", 100), timeout=arguments.get("timeout", 30) )
- Tool schema definition for 'analyze_logs', including name, description, and parameter types/defaults used in list_available_tools.{"name": "analyze_logs", "description": "分析服务器日志文件中的错误和警告", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "log_file", "type": "str", "default": "/var/log/syslog"}, {"name": "pattern", "type": "str", "default": "error|fail|critical"}, {"name": "lines", "type": "int", "default": 100}, {"name": "timeout", "type": "int", "default": 30} ]},
- server_monitor/main.py:42-61 (registration)Tool registration dictionary mapping 'analyze_logs' to the handler function, used to dynamically register tools in FastMCP.tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health }