Skip to main content
Glama

OPS MCP Server

by Heht571
system_tools.py11.5 kB
"""系统相关工具函数""" import psutil import re from server_monitor.models.schemas import InspectionResult from server_monitor.core.ssh_manager import SSHManager from server_monitor.core.inspector import ServerInspector from server_monitor.utils.decorators import handle_exceptions from server_monitor.config.logger import logger @handle_exceptions def get_memory_info() -> dict: """获取本地服务器内存信息""" mem = psutil.virtual_memory() return { "status": "success", "total": mem.total, "used": mem.used, "free": mem.free, "usage": mem.percent, "available": mem.available, "cached": getattr(mem, 'cached', 0), "buffers": getattr(mem, 'buffers', 0) } @handle_exceptions def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } # 验证模块 valid_modules = [m for m in inspection_modules if m in commands] if len(valid_modules) != len(inspection_modules): invalid_modules = set(inspection_modules) - set(valid_modules) logger.warning(f"忽略无效的巡检模块: {invalid_modules}") # 如果没有有效模块,提前返回 if not valid_modules: result.status = "error" result.error = "没有有效的巡检模块" result.summary = "巡检失败:没有有效的巡检模块" return result.dict() # 使用优化的SSH连接管理器 with SSHManager(hostname, username, password, port, timeout, use_cache=use_connection_cache) as ssh: # 并行执行所有命令 module_results = {} for module in valid_modules: try: # 执行命令 stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) raw_output = stdout.read().decode().strip() error_output = stderr.read().decode().strip() if error_output: logger.warning(f"模块 {module} 执行时有错误输出: {error_output}") result.raw_outputs[module] = raw_output # 解析结果 if not raw_output: logger.warning(f"模块 {module} 没有输出") continue # 使用模式匹配解析不同模块的输出 match module: case "cpu": result.data[module] = ServerInspector.parse_cpu(raw_output) case "memory": result.data[module] = ServerInspector.parse_memory(raw_output).dict() case "disk": result.data[module] = ServerInspector.parse_disk(raw_output) case "io": # 这里可以添加IO解析逻辑 pass case "network": # 这里可以添加网络解析逻辑 pass module_results[module] = "success" except Exception as e: logger.error(f"模块 {module} 执行失败: {str(e)}") module_results[module] = f"failed: {str(e)}" # 生成摘要 success_modules = [m for m, status in module_results.items() if status == "success"] failed_modules = [m for m, status in module_results.items() if status != "success"] if failed_modules: if success_modules: result.summary = f"部分模块巡检成功 ({len(success_modules)}/{len(valid_modules)})" else: result.summary = "所有模块巡检失败" else: result.summary = "服务器巡检成功" result.status = "error" if not success_modules else "success" logger.info(f"完成对 {hostname} 的服务器巡检,状态: {result.status}") return result.dict() @handle_exceptions def get_system_load( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """获取系统负载信息""" try: with SSHManager(hostname, username, password, port, timeout) as ssh: stdin, stdout, stderr = ssh.exec_command("uptime") load_output = stdout.read().decode().strip() load_avg = re.search(r'load average: (.*)', load_output) return {"status": "success", "load_average": load_avg.group(1) if load_avg else "unknown"} except Exception as e: return {"status": "error", "error": str(e)} @handle_exceptions def monitor_processes( hostname: str, username: str, password: str = "", port: int = 22, top_n: int = 10, sort_by: str = "cpu", timeout: int = 30 ) -> dict: """监控远程服务器进程,返回占用资源最多的进程""" result = {"status": "unknown", "processes": [], "error": ""} sort_options = { "cpu": "-pcpu", "memory": "-pmem", "time": "-time" } sort_param = sort_options.get(sort_by, "-pcpu") try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 使用ps命令获取进程信息,并按指定条件排序 command = f"ps aux --sort={sort_param} | head -n {top_n + 1}" # +1 是为了包含标题行 stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) raw_output = stdout.read().decode().strip() # 解析进程信息 result["processes"] = ServerInspector.parse_processes(raw_output) result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def check_service_status( hostname: str, username: str, password: str = "", port: int = 22, services: list[str] = [], timeout: int = 30 ) -> dict: """检查指定服务的运行状态""" result = {"status": "unknown", "services": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: if services: # 检查特定服务 service_statuses = [] for service in services: command = f"systemctl status {service}" stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) output = stdout.read().decode().strip() # 分析输出判断服务状态 service_status = { "name": service, "status": "unknown", "active": False, "enabled": False } if "Active: active" in output: service_status["status"] = "running" service_status["active"] = True elif "Active: inactive" in output: service_status["status"] = "stopped" elif "not-found" in output or "could not be found" in output: service_status["status"] = "not found" # 检查是否开机启动 enabled_command = f"systemctl is-enabled {service}" stdin, stdout, stderr = ssh.exec_command(enabled_command, timeout=timeout) enabled_output = stdout.read().decode().strip() service_status["enabled"] = enabled_output == "enabled" service_statuses.append(service_status) result["services"] = service_statuses else: # 列出所有活跃的服务 command = "systemctl list-units --type=service --state=running" stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) raw_output = stdout.read().decode().strip() # 解析服务状态 result["services"] = ServerInspector.parse_services(raw_output) result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def get_os_details( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """获取操作系统详细信息""" result = {"status": "unknown", "os_info": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 收集各种系统信息 commands = { "hostname": "hostname", "os_release": "cat /etc/os-release || cat /etc/redhat-release || cat /etc/debian_version || uname -a", "kernel": "uname -r", "architecture": "uname -m", "uptime": "uptime -p", "last_boot": "who -b" } os_info = {} for key, command in commands.items(): stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) output = stdout.read().decode().strip() os_info[key] = output # 解析OS分发版和版本 distro = "Unknown" version = "Unknown" if "NAME=" in os_info["os_release"]: distro_match = re.search(r'NAME="?(.*?)"?', os_info["os_release"], re.MULTILINE) if distro_match: distro = distro_match.group(1) version_match = re.search(r'VERSION="?(.*?)"?', os_info["os_release"], re.MULTILINE) if version_match: version = version_match.group(1) else: version_id_match = re.search(r'VERSION_ID="?(.*?)"?', os_info["os_release"], re.MULTILINE) if version_id_match: version = version_id_match.group(1) os_info["distro"] = distro os_info["version"] = version # 检查是否为虚拟机 vm_check_command = "systemd-detect-virt || dmesg | grep -i virtual || dmidecode | grep -i vmware || dmidecode | grep -i virtualbox || echo 'Unknown'" stdin, stdout, stderr = ssh.exec_command(vm_check_command, timeout=timeout) vm_output = stdout.read().decode().strip() os_info["virtualization"] = "Unknown" if vm_output != "Unknown": for vm_type in ["kvm", "vmware", "virtualbox", "xen", "docker", "lxc", "openvz", "parallels"]: if vm_type.lower() in vm_output.lower(): os_info["virtualization"] = vm_type break result["os_info"] = os_info result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server