remote_server_inspection
Execute remote server inspections to check CPU, memory, disk, and other system health metrics via SSH connection for monitoring and troubleshooting purposes.
Instructions
执行远程服务器巡检
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| inspection_modules | No | ||
| timeout | No | ||
| use_connection_cache | No |
Implementation Reference
- Primary implementation of the remote_server_inspection tool. Connects via SSH, executes commands for specified inspection modules (cpu, memory, disk, etc.), parses outputs using ServerInspector, tracks success per module, generates summary.def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } # 验证模块 valid_modules = [m for m in inspection_modules if m in commands] if len(valid_modules) != len(inspection_modules): invalid_modules = set(inspection_modules) - set(valid_modules) logger.warning(f"忽略无效的巡检模块: {invalid_modules}") # 如果没有有效模块,提前返回 if not valid_modules: result.status = "error" result.error = "没有有效的巡检模块" result.summary = "巡检失败:没有有效的巡检模块" return result.dict() # 使用优化的SSH连接管理器 with SSHManager(hostname, username, password, port, timeout, use_cache=use_connection_cache) as ssh: # 并行执行所有命令 module_results = {} for module in valid_modules: try: # 执行命令 stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) raw_output = stdout.read().decode().strip() error_output = stderr.read().decode().strip() if error_output: logger.warning(f"模块 {module} 执行时有错误输出: {error_output}") result.raw_outputs[module] = raw_output # 解析结果 if not raw_output: logger.warning(f"模块 {module} 没有输出") continue # 使用模式匹配解析不同模块的输出 match module: case "cpu": result.data[module] = ServerInspector.parse_cpu(raw_output) case "memory": result.data[module] = ServerInspector.parse_memory(raw_output).dict() case "disk": result.data[module] = ServerInspector.parse_disk(raw_output) case "io": # 这里可以添加IO解析逻辑 pass case "network": # 这里可以添加网络解析逻辑 pass module_results[module] = "success" except Exception as e: logger.error(f"模块 {module} 执行失败: {str(e)}") module_results[module] = f"failed: {str(e)}" # 生成摘要 success_modules = [m for m, status in module_results.items() if status == "success"] failed_modules = [m for m, status in module_results.items() if status != "success"] if failed_modules: if success_modules: result.summary = f"部分模块巡检成功 ({len(success_modules)}/{len(valid_modules)})" else: result.summary = "所有模块巡检失败" else: result.summary = "服务器巡检成功" result.status = "error" if not success_modules else "success" logger.info(f"完成对 {hostname} 的服务器巡检,状态: {result.status}") return result.dict()
- SSE variant of the remote_server_inspection handler. Performs basic SSH inspection for modules without detailed per-module error handling.def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } try: with SSHManager(hostname, username, password, port, timeout, use_connection_cache) as ssh: # 执行每个模块的命令并解析结果 for module in inspection_modules: if module in commands: stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) output = stdout.read().decode().strip() error = stderr.read().decode().strip() if error: logger.warning(f"执行 {module} 命令时出现警告: {error}") # 存储原始输出 result.raw_outputs[module] = output # 解析结果 if module == "cpu": result.data[module] = ServerInspector.parse_cpu(output) elif module == "memory": result.data[module] = ServerInspector.parse_memory(output) elif module == "disk": result.data[module] = ServerInspector.parse_disk(output) # 可以添加更多模块的解析逻辑 else: logger.warning(f"未知的巡检模块: {module}") # 设置状态和汇总信息 result.status = "success" result.summary = f"完成对 {hostname} 的服务器巡检,检查了 {len(inspection_modules)} 个模块" except Exception as e: result.status = "error" result.error = str(e) logger.error(f"服务器巡检失败: {str(e)}") return result.dict()
- server_monitor/tools/utils.py:45-52 (schema)JSON schema definition for the remote_server_inspection tool parameters used in list_available_tools.{"name": "remote_server_inspection", "description": "执行远程服务器巡检", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "inspection_modules", "type": "list[str]", "default": ["cpu", "memory", "disk"]}, {"name": "timeout", "type": "int", "default": 30} ]},
- server_monitor/main.py:42-66 (registration)Registration of the tool in MCP server: included in tools_dict and decorated with mcp.tool().tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health } # 使用装饰器动态注册所有工具 for name, func in tools_dict.items(): mcp.tool()(func)
- server_monitor_sse/server.py:52-66 (registration)Tool dispatch registration in SSE server: handles tool calls by name and invokes remote_server_inspection.if name == "remote_server_inspection": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = remote_server_inspection( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), inspection_modules=arguments.get("inspection_modules", ["cpu", "memory", "disk"]), timeout=arguments.get("timeout", 30), use_connection_cache=arguments.get("use_connection_cache", True) )