Skip to main content
Glama

OPS MCP Server

by Heht571
system_tools.py7.18 kB
"""系统相关工具函数""" import psutil import re from models.schemas import InspectionResult from core.ssh_manager import SSHManager from core.inspector import ServerInspector from utils.decorators import handle_exceptions from config.logger import logger @handle_exceptions def get_memory_info() -> dict: """获取本地服务器内存信息""" mem = psutil.virtual_memory() return { "status": "success", "total": mem.total, "used": mem.used, "free": mem.free, "usage": mem.percent, "available": mem.available, "cached": getattr(mem, 'cached', 0), "buffers": getattr(mem, 'buffers', 0) } @handle_exceptions def remote_server_inspection( hostname: str, username: str, password: str = "", port: int = 22, inspection_modules: list[str] = ["cpu", "memory", "disk"], timeout: int = 30, use_connection_cache: bool = True ) -> dict: """执行远程服务器巡检""" result = InspectionResult() logger.info(f"开始对 {hostname} 执行服务器巡检,模块: {inspection_modules}") # 定义命令映射,使用更高效的命令 commands = { "cpu": "top -bn1 | grep 'Cpu(s)' && uptime", "memory": "free -m", "disk": "df -h", # 添加更多模块的命令 "io": "iostat -x 1 2 | tail -n +4", "network": "netstat -i" } try: with SSHManager(hostname, username, password, port, timeout, use_connection_cache) as ssh: # 执行每个模块的命令并解析结果 for module in inspection_modules: if module in commands: stdin, stdout, stderr = ssh.exec_command(commands[module], timeout=timeout) output = stdout.read().decode().strip() error = stderr.read().decode().strip() if error: logger.warning(f"执行 {module} 命令时出现警告: {error}") # 存储原始输出 result.raw_outputs[module] = output # 解析结果 if module == "cpu": result.data[module] = ServerInspector.parse_cpu(output) elif module == "memory": result.data[module] = ServerInspector.parse_memory(output) elif module == "disk": result.data[module] = ServerInspector.parse_disk(output) # 可以添加更多模块的解析逻辑 else: logger.warning(f"未知的巡检模块: {module}") # 设置状态和汇总信息 result.status = "success" result.summary = f"完成对 {hostname} 的服务器巡检,检查了 {len(inspection_modules)} 个模块" except Exception as e: result.status = "error" result.error = str(e) logger.error(f"服务器巡检失败: {str(e)}") return result.dict() @handle_exceptions def get_system_load( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """获取系统负载信息""" try: with SSHManager(hostname, username, password, port, timeout) as ssh: stdin, stdout, stderr = ssh.exec_command("uptime") load_output = stdout.read().decode().strip() load_avg = re.search(r'load average: (.*)', load_output) return {"status": "success", "load_average": load_avg.group(1) if load_avg else "unknown"} except Exception as e: return {"status": "error", "error": str(e)} @handle_exceptions def monitor_processes( hostname: str, username: str, password: str = "", port: int = 22, top_n: int = 10, sort_by: str = "cpu", timeout: int = 30 ) -> dict: """监控远程服务器进程,返回占用资源最多的进程""" result = {"status": "unknown", "processes": [], "error": ""} sort_options = { "cpu": "-pcpu", "memory": "-pmem", "time": "-time" } sort_param = sort_options.get(sort_by, "-pcpu") try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 使用ps命令获取进程信息,并按指定条件排序 command = f"ps aux --sort={sort_param} | head -n {top_n + 1}" # +1 是为了包含标题行 stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) raw_output = stdout.read().decode().strip() # 解析进程信息 result["processes"] = ServerInspector.parse_processes(raw_output) result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def check_service_status( hostname: str, username: str, password: str = "", port: int = 22, service_names: list[str] = [], # 为空则检查所有服务 timeout: int = 30 ) -> dict: """检查服务状态""" result = {"status": "unknown", "services": [], "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 构建命令 if service_names: # 检查指定服务 services_str = " ".join(service_names) command = f"systemctl status {services_str}" else: # 列出所有服务 command = "systemctl list-units --type=service --all" stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) raw_output = stdout.read().decode().strip() # 解析服务状态 result["services"] = ServerInspector.parse_services(raw_output) result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def get_os_details( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """获取操作系统详细信息""" result = {"status": "unknown", "os_info": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 收集各种系统信息 commands = { "hostname": "hostname", "os_release": "cat /etc/os-release || cat /etc/redhat-release || cat /etc/debian_version || uname -a", "kernel": "uname -r", "architecture": "uname -m", "uptime": "uptime -p", "last_boot": "who -b" } os_info = {} for key, command in commands.items(): stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout) output = stdout.read().decode().strip() os_info[key] = output result["os_info"] = os_info result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server