Skip to main content
Glama
system_info.py19.6 kB
"""System information tools.""" import asyncio import os import platform import subprocess from datetime import datetime, timedelta from typing import Optional import psutil from .ssh_executor import execute_command from .utils import format_bytes async def get_system_info(host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get basic system information. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with basic system information """ info = [] try: if host: # Remote execution - use Linux commands # Hostname returncode, stdout, _ = await execute_command( ["hostname"], host=host, username=username ) if returncode == 0 and stdout: info.append(f"Hostname: {stdout.strip()}") # OS Information from /etc/os-release returncode, stdout, _ = await execute_command( ["cat", "/etc/os-release"], host=host, username=username ) if returncode == 0 and stdout: os_info = {} for line in stdout.split('\n'): line = line.strip() if "=" in line: key, value = line.split("=", 1) os_info[key] = value.strip('"') info.append(f"Operating System: {os_info.get('PRETTY_NAME', 'Unknown')}") if 'VERSION_ID' in os_info: info.append(f"OS Version: {os_info['VERSION_ID']}") # Kernel version returncode, stdout, _ = await execute_command( ["uname", "-r"], host=host, username=username ) if returncode == 0 and stdout: info.append(f"Kernel Version: {stdout.strip()}") # Architecture returncode, stdout, _ = await execute_command( ["uname", "-m"], host=host, username=username ) if returncode == 0 and stdout: info.append(f"Architecture: {stdout.strip()}") # Uptime returncode, stdout, _ = await execute_command( ["uptime", "-p"], host=host, username=username ) if returncode == 0 and stdout: info.append(f"Uptime: {stdout.strip()}") # Boot time returncode, stdout, _ = await execute_command( ["uptime", "-s"], host=host, username=username ) if returncode == 0 and stdout: info.append(f"Boot Time: {stdout.strip()}") else: # Local execution - use psutil and platform # Hostname hostname = platform.node() info.append(f"Hostname: {hostname}") # OS Information if os.path.exists("/etc/os-release"): os_info = {} with open("/etc/os-release", "r") as f: for line in f: line = line.strip() if "=" in line: key, value = line.split("=", 1) os_info[key] = value.strip('"') info.append(f"Operating System: {os_info.get('PRETTY_NAME', 'Unknown')}") if 'VERSION_ID' in os_info: info.append(f"OS Version: {os_info['VERSION_ID']}") else: info.append(f"Operating System: {platform.system()} {platform.release()}") # Kernel version kernel = platform.release() info.append(f"Kernel Version: {kernel}") # Architecture arch = platform.machine() info.append(f"Architecture: {arch}") # Uptime boot_time = datetime.fromtimestamp(psutil.boot_time()) uptime = datetime.now() - boot_time days = uptime.days hours, remainder = divmod(uptime.seconds, 3600) minutes, seconds = divmod(remainder, 60) info.append(f"Uptime: {days}d {hours}h {minutes}m {seconds}s") info.append(f"Boot Time: {boot_time.strftime('%Y-%m-%d %H:%M:%S')}") return "\n".join(info) except Exception as e: return f"Error gathering system information: {str(e)}" async def get_cpu_info(host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get CPU information. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with CPU information """ info = [] try: if host: # Remote execution - use Linux commands # Get CPU model from /proc/cpuinfo returncode, stdout, _ = await execute_command( ["grep", "-m", "1", "model name", "/proc/cpuinfo"], host=host, username=username ) if returncode == 0 and stdout: cpu_model = stdout.split(":", 1)[1].strip() if ":" in stdout else stdout.strip() info.append(f"CPU Model: {cpu_model}") # Get CPU core counts from /proc/cpuinfo returncode, stdout, _ = await execute_command( ["grep", "-c", "^processor", "/proc/cpuinfo"], host=host, username=username ) if returncode == 0 and stdout: logical_cores = int(stdout.strip()) info.append(f"CPU Logical Cores (threads): {logical_cores}") # Get physical cores (using core id uniqueness) returncode, stdout, _ = await execute_command( ["grep", "^core id", "/proc/cpuinfo"], host=host, username=username ) if returncode == 0 and stdout: core_ids = set(line.split(":", 1)[1].strip() for line in stdout.strip().split('\n') if ":" in line) physical_cores = len(core_ids) info.append(f"CPU Physical Cores: {physical_cores}") # Get CPU frequency from /proc/cpuinfo returncode, stdout, _ = await execute_command( ["grep", "-m", "1", "cpu MHz", "/proc/cpuinfo"], host=host, username=username ) if returncode == 0 and stdout and ":" in stdout: cpu_mhz = stdout.split(":", 1)[1].strip() info.append(f"CPU Frequency: Current={cpu_mhz}MHz") # Get load average returncode, stdout, _ = await execute_command( ["cat", "/proc/loadavg"], host=host, username=username ) if returncode == 0 and stdout: load_parts = stdout.strip().split() if len(load_parts) >= 3: info.append(f"\nLoad Average (1m, 5m, 15m): {load_parts[0]}, {load_parts[1]}, {load_parts[2]}") # Get CPU usage using top (one iteration) returncode, stdout, _ = await execute_command( ["top", "-bn1"], host=host, username=username ) if returncode == 0 and stdout: for line in stdout.split('\n'): if 'Cpu(s):' in line or '%Cpu' in line: info.append(f"\n{line.strip()}") break else: # Local execution - use psutil # CPU count physical_cores = psutil.cpu_count(logical=False) logical_cores = psutil.cpu_count(logical=True) info.append(f"CPU Physical Cores: {physical_cores}") info.append(f"CPU Logical Cores (threads): {logical_cores}") # CPU frequency try: cpu_freq = psutil.cpu_freq() if cpu_freq: info.append(f"CPU Frequency: Current={cpu_freq.current:.2f}MHz, Min={cpu_freq.min:.2f}MHz, Max={cpu_freq.max:.2f}MHz") except Exception: pass # CPU frequency might not be available # CPU usage per core cpu_percent = psutil.cpu_percent(interval=1, percpu=True) info.append(f"\nCPU Usage per Core:") for i, percent in enumerate(cpu_percent): info.append(f" Core {i}: {percent}%") # Overall CPU usage overall_cpu = psutil.cpu_percent(interval=1) info.append(f"\nOverall CPU Usage: {overall_cpu}%") # Load average load_avg = os.getloadavg() info.append(f"\nLoad Average (1m, 5m, 15m): {load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}") # Try to get CPU model info from /proc/cpuinfo try: with open("/proc/cpuinfo", "r") as f: for line in f: if line.startswith("model name"): cpu_model = line.split(":")[1].strip() info.insert(0, f"CPU Model: {cpu_model}") break except Exception: pass return "\n".join(info) except Exception as e: return f"Error gathering CPU information: {str(e)}" async def get_memory_info(host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get memory information. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with memory information """ info = [] try: if host: # Remote execution - use free command returncode, stdout, _ = await execute_command( ["free", "-b"], host=host, username=username ) if returncode == 0 and stdout: lines = stdout.strip().split('\n') # Parse memory line for line in lines: if line.startswith('Mem:'): parts = line.split() if len(parts) >= 7: total = int(parts[1]) used = int(parts[2]) free = int(parts[3]) available = int(parts[6]) if len(parts) > 6 else free percent = (used / total * 100) if total > 0 else 0 info.append("=== RAM Information ===") info.append(f"Total: {format_bytes(total)}") info.append(f"Available: {format_bytes(available)}") info.append(f"Used: {format_bytes(used)} ({percent:.1f}%)") info.append(f"Free: {format_bytes(free)}") elif line.startswith('Swap:'): parts = line.split() if len(parts) >= 4: total = int(parts[1]) used = int(parts[2]) free = int(parts[3]) percent = (used / total * 100) if total > 0 else 0 info.append("\n=== Swap Information ===") info.append(f"Total: {format_bytes(total)}") info.append(f"Used: {format_bytes(used)} ({percent:.1f}%)") info.append(f"Free: {format_bytes(free)}") else: # Local execution - use psutil # Virtual memory (RAM) mem = psutil.virtual_memory() info.append("=== RAM Information ===") info.append(f"Total: {format_bytes(mem.total)}") info.append(f"Available: {format_bytes(mem.available)}") info.append(f"Used: {format_bytes(mem.used)} ({mem.percent}%)") info.append(f"Free: {format_bytes(mem.free)}") if hasattr(mem, 'buffers'): info.append(f"Buffers: {format_bytes(mem.buffers)}") if hasattr(mem, 'cached'): info.append(f"Cached: {format_bytes(mem.cached)}") # Swap memory swap = psutil.swap_memory() info.append("\n=== Swap Information ===") info.append(f"Total: {format_bytes(swap.total)}") info.append(f"Used: {format_bytes(swap.used)} ({swap.percent}%)") info.append(f"Free: {format_bytes(swap.free)}") return "\n".join(info) except Exception as e: return f"Error gathering memory information: {str(e)}" async def get_disk_usage(host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get disk usage information. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with disk usage information """ info = [] try: if host: # Remote execution - use df command returncode, stdout, _ = await execute_command( ["df", "-h", "--output=source,size,used,avail,pcent,target"], host=host, username=username ) if returncode == 0 and stdout: info.append("=== Filesystem Usage ===\n") info.append(stdout) else: # Fallback to basic df command returncode, stdout, _ = await execute_command( ["df", "-h"], host=host, username=username ) if returncode == 0 and stdout: info.append("=== Filesystem Usage ===\n") info.append(stdout) else: # Local execution - use psutil info.append("=== Filesystem Usage ===\n") info.append(f"{'Filesystem':<30} {'Size':<10} {'Used':<10} {'Avail':<10} {'Use%':<6} {'Mounted on'}") info.append("-" * 90) # Get all disk partitions partitions = psutil.disk_partitions(all=False) for partition in partitions: try: usage = psutil.disk_usage(partition.mountpoint) info.append( f"{partition.device:<30} " f"{format_bytes(usage.total):<10} " f"{format_bytes(usage.used):<10} " f"{format_bytes(usage.free):<10} " f"{usage.percent:<6.1f} " f"{partition.mountpoint}" ) except PermissionError: # Skip partitions we can't access continue except Exception as e: info.append(f"{partition.device:<30} Error: {str(e)}") # Disk I/O statistics try: disk_io = psutil.disk_io_counters() if disk_io: info.append("\n=== Disk I/O Statistics (since boot) ===") info.append(f"Read: {format_bytes(disk_io.read_bytes)}") info.append(f"Write: {format_bytes(disk_io.write_bytes)}") info.append(f"Read Count: {disk_io.read_count}") info.append(f"Write Count: {disk_io.write_count}") except Exception: pass # Disk I/O might not be available return "\n".join(info) except Exception as e: return f"Error gathering disk usage information: {str(e)}" async def get_hardware_info(host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get hardware information. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with hardware information """ try: info = [] info.append("=== Hardware Information ===\n") # Try lscpu for CPU info try: returncode, stdout, stderr = await execute_command( ["lscpu"], host=host, username=username ) if returncode == 0: info.append("=== CPU Architecture (lscpu) ===") info.append(stdout) except FileNotFoundError: info.append("CPU info: lscpu command not available") # Try lspci for PCI devices try: returncode, stdout, stderr = await execute_command( ["lspci"], host=host, username=username ) if returncode == 0: pci_lines = stdout.strip().split('\n') info.append("\n=== PCI Devices ===") # Show first 50 devices to avoid overwhelming output for line in pci_lines[:50]: info.append(line) if len(pci_lines) > 50: info.append(f"\n... and {len(pci_lines) - 50} more PCI devices") except FileNotFoundError: info.append("\nPCI devices: lspci command not available") # Try lsusb for USB devices try: returncode, stdout, stderr = await execute_command( ["lsusb"], host=host, username=username ) if returncode == 0: info.append("\n\n=== USB Devices ===") info.append(stdout) except FileNotFoundError: info.append("\nUSB devices: lsusb command not available") # Memory hardware info from dmidecode (requires root) try: returncode, stdout, stderr = await execute_command( ["dmidecode", "-t", "memory"], host=host, username=username ) if returncode == 0: info.append("\n\n=== Memory Hardware (dmidecode) ===") info.append(stdout) elif "Permission denied" in stderr: info.append("\n\nMemory hardware info: Requires root privileges (dmidecode)") except FileNotFoundError: info.append("\nMemory hardware info: dmidecode command not available") if len(info) == 1: # Only the header info.append("No hardware information tools available.") return "\n".join(info) except Exception as e: return f"Error getting hardware information: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/narmaku/linux-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server