Skip to main content
Glama

OPS MCP Server

by Heht571
network_tools.py5.27 kB
"""网络相关工具函数""" from core.ssh_manager import SSHManager from core.inspector import ServerInspector from utils.decorators import handle_exceptions from config.logger import logger @handle_exceptions def inspect_network( hostname: str, username: str, password: str = "", port: int = 22, timeout: int = 30 ) -> dict: """检查网络接口和连接状态""" result = {"status": "unknown", "interfaces": [], "connections": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 获取网络接口信息 interfaces_command = "ip a" stdin, stdout, stderr = ssh.exec_command(interfaces_command, timeout=timeout) interfaces_output = stdout.read().decode().strip() # 解析网络接口信息 result["interfaces"] = ServerInspector.parse_network_interfaces(interfaces_output) # 获取网络连接信息 connections_command = "ss -tuln" stdin, stdout, stderr = ssh.exec_command(connections_command, timeout=timeout) connections_output = stdout.read().decode().strip() # 解析监听端口 listening_ports = [] for line in connections_output.split('\n')[1:]: # 跳过标题行 if "LISTEN" in line: parts = line.split() if len(parts) >= 5: address_port = parts[4] if ":" in address_port: port = address_port.split(":")[-1] listening_ports.append(port) result["connections"]["listening_ports"] = listening_ports # 检查是否可以连接公网 internet_check = ssh.exec_command("ping -c 1 -W 2 8.8.8.8", timeout=timeout) internet_output = internet_check[1].read().decode().strip() result["connections"]["internet_connectivity"] = "1 received" in internet_output result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result @handle_exceptions def analyze_logs( hostname: str, username: str, password: str = "", port: int = 22, log_file: str = "/var/log/syslog", pattern: str = "error|fail|critical", lines: int = 100, timeout: int = 30 ) -> dict: """分析服务器日志文件中的错误和警告""" result = {"status": "unknown", "entries": [], "summary": {}, "error": ""} try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 获取日志的最后几行 tail_command = f"tail -n {lines} {log_file}" stdin, stdout, stderr = ssh.exec_command(tail_command, timeout=timeout) log_output = stdout.read().decode().strip() if not log_output: result["error"] = f"无法读取日志文件 {log_file}" result["status"] = "error" return result # 使用grep过滤包含指定模式的行 grep_command = f"echo '{log_output}' | grep -E '{pattern}'" stdin, stdout, stderr = ssh.exec_command(grep_command, timeout=timeout) matched_output = stdout.read().decode().strip() # 初始化计数器 pattern_counts = { "critical": 0, "error": 0, "warning": 0, "fail": 0, "other": 0 } entries = [] for line in matched_output.split('\n'): if not line: continue # 尝试提取时间戳 timestamp = "" try: # 假设日志的前部分是时间戳 timestamp_part = ' '.join(line.split()[:3]) timestamp = timestamp_part except: pass # 确定日志级别 level = "other" line_lower = line.lower() if "critical" in line_lower: level = "critical" pattern_counts["critical"] += 1 elif "error" in line_lower: level = "error" pattern_counts["error"] += 1 elif "warning" in line_lower or "warn" in line_lower: level = "warning" pattern_counts["warning"] += 1 elif "fail" in line_lower: level = "fail" pattern_counts["fail"] += 1 else: pattern_counts["other"] += 1 entries.append({ "timestamp": timestamp, "level": level, "message": line }) result["entries"] = entries result["summary"] = { "total_entries": len(entries), "counts_by_level": pattern_counts } result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Heht571/ops-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server