Skip to main content
Glama

remote_server_inspection

Execute remote server inspections to check CPU, memory, disk, and other system health metrics via SSH connection for monitoring and troubleshooting purposes.

Instructions

执行远程服务器巡检

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hostnameYes
usernameYes
passwordNo
portNo
inspection_modulesNo
timeoutNo
use_connection_cacheNo

Implementation Reference

  • Primary implementation of the remote_server_inspection tool. Connects via SSH, executes commands for specified inspection modules (cpu, memory, disk, etc.), parses outputs using ServerInspector, tracks success per module, generates summary.
    def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } # 验证模块 valid_modules = [m for m in inspection_modules if m in commands] if len(valid_modules) != len(inspection_modules): invalid_modules = set(inspection_modules) - set(valid_modules) logger.warning(f"忽略无效的巡检模块: {invalid_modules}") # 如果没有有效模块,提前返回 if not valid_modules: result.status = "error" result.error = "没有有效的巡检模块" result.summary = "巡检失败:没有有效的巡检模块" return result.dict() # 使用优化的SSH连接管理器 with SSHManager(hostname, username, password, port, timeout, use_cache=use_connection_cache) as ssh: # 并行执行所有命令 module_results = {} for module in valid_modules: try: # 执行命令 stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) raw_output = stdout.read().decode().strip() error_output = stderr.read().decode().strip() if error_output: logger.warning(f"模块 {module} 执行时有错误输出: {error_output}") result.raw_outputs[module] = raw_output # 解析结果 if not raw_output: logger.warning(f"模块 {module} 没有输出") continue # 使用模式匹配解析不同模块的输出 match module: case "cpu": result.data[module] = ServerInspector.parse_cpu(raw_output) case "memory": result.data[module] = ServerInspector.parse_memory(raw_output).dict() case "disk": result.data[module] = ServerInspector.parse_disk(raw_output) case "io": # 这里可以添加IO解析逻辑 pass case "network": # 这里可以添加网络解析逻辑 pass module_results[module] = "success" except Exception as e: logger.error(f"模块 {module} 执行失败: {str(e)}") module_results[module] = f"failed: {str(e)}" # 生成摘要 success_modules = [m for m, status in module_results.items() if status == "success"] failed_modules = [m for m, status in module_results.items() if status != "success"] if failed_modules: if success_modules: result.summary = f"部分模块巡检成功 ({len(success_modules)}/{len(valid_modules)})" else: result.summary = "所有模块巡检失败" else: result.summary = "服务器巡检成功" result.status = "error" if not success_modules else "success" logger.info(f"完成对 {hostname} 的服务器巡检,状态: {result.status}") return result.dict()
  • SSE variant of the remote_server_inspection handler. Performs basic SSH inspection for modules without detailed per-module error handling.
    def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } try: with SSHManager(hostname, username, password, port, timeout, use_connection_cache) as ssh: # 执行每个模块的命令并解析结果 for module in inspection_modules: if module in commands: stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) output = stdout.read().decode().strip() error = stderr.read().decode().strip() if error: logger.warning(f"执行 {module} 命令时出现警告: {error}") # 存储原始输出 result.raw_outputs[module] = output # 解析结果 if module == "cpu": result.data[module] = ServerInspector.parse_cpu(output) elif module == "memory": result.data[module] = ServerInspector.parse_memory(output) elif module == "disk": result.data[module] = ServerInspector.parse_disk(output) # 可以添加更多模块的解析逻辑 else: logger.warning(f"未知的巡检模块: {module}") # 设置状态和汇总信息 result.status = "success" result.summary = f"完成对 {hostname} 的服务器巡检,检查了 {len(inspection_modules)} 个模块" except Exception as e: result.status = "error" result.error = str(e) logger.error(f"服务器巡检失败: {str(e)}") return result.dict()
  • JSON schema definition for the remote_server_inspection tool parameters used in list_available_tools.
    {"name": "remote_server_inspection", "description": "执行远程服务器巡检", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "inspection_modules", "type": "list[str]", "default": ["cpu", "memory", "disk"]}, {"name": "timeout", "type": "int", "default": 30} ]},
  • Registration of the tool in MCP server: included in tools_dict and decorated with mcp.tool().
    tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health } # 使用装饰器动态注册所有工具 for name, func in tools_dict.items(): mcp.tool()(func)
  • Tool dispatch registration in SSE server: handles tool calls by name and invokes remote_server_inspection.
    if name == "remote_server_inspection": required_args = ["hostname", "username"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = remote_server_inspection( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), inspection_modules=arguments.get("inspection_modules", ["cpu", "memory", "disk"]), timeout=arguments.get("timeout", 30), use_connection_cache=arguments.get("use_connection_cache", True) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server