Skip to main content
Glama

Awesome Linux MCP Server

by aitesthaj
linux_mcp_server.pyโ€ข19.1 kB
#!/usr/bin/env python3 """ Linux MCP Server - Remote Linux Server Control and Browser Automation Provides AI models with secure SSH-based control over remote Linux servers and browser automation capabilities through Puppeteer. MVP Features: - SSH-based command execution on remote Linux servers - Bash script management and execution - File system operations - System monitoring (CPU, memory, disk) - Browser automation with Puppeteer - MCP resources for browser data """ import os import sys import logging import asyncio import json from typing import Optional, Dict, Any, List import paramiko import pyppeteer as pyppeteer import psutil from mcp.server.fastmcp import FastMCP # Configure logging to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr ) logger = logging.getLogger("linux-mcp-server") # Initialize MCP server mcp = FastMCP("linux-server") # Configuration from environment variables SSH_HOST = os.environ.get("SSH_HOST", "localhost") SSH_PORT = int(os.environ.get("SSH_PORT", "22")) SSH_USER = os.environ.get("SSH_USER", "") SSH_KEY_PATH = os.environ.get("SSH_KEY_PATH", "") SSH_PASSWORD = os.environ.get("SSH_PASSWORD", "") BROWSER_HEADLESS = os.environ.get("BROWSER_HEADLESS", "true").lower() == "true" LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") # Global variables for browser and resources browser = None page = None console_logs = [] network_requests = [] # === UTILITY FUNCTIONS === async def get_ssh_client() -> paramiko.SSHClient: """Create and return configured SSH client.""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if SSH_KEY_PATH and os.path.exists(SSH_KEY_PATH): client.connect( SSH_HOST, port=SSH_PORT, username=SSH_USER, key_filename=SSH_KEY_PATH, timeout=10 ) logger.info(f"Connected to {SSH_HOST}:{SSH_PORT} using SSH key") elif SSH_PASSWORD: client.connect( SSH_HOST, port=SSH_PORT, username=SSH_USER, password=SSH_PASSWORD, timeout=10 ) logger.info(f"Connected to {SSH_HOST}:{SSH_PORT} using password") else: raise Exception("No SSH authentication method configured") except Exception as e: logger.error(f"SSH connection failed: {str(e)}") raise Exception(f"SSH connection failed: {str(e)}") return client async def init_browser(): """Initialize Puppeteer browser if not already running.""" global browser, page, console_logs, network_requests if browser is None: logger.info("Initializing browser...") browser = await pyppeteer.launch( headless=BROWSER_HEADLESS, args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] ) page = await browser.newPage() # Set up console logging console_logs = [] def handle_console(msg): log_entry = f"[{msg.type.upper()}] {msg.text}" console_logs.append(log_entry) logger.debug(f"Browser console: {log_entry}") page.on('console', handle_console) # Set up network monitoring network_requests = [] def handle_request(request): network_requests.append({ 'method': request.method, 'url': request.url, 'headers': dict(request.headers), 'timestamp': asyncio.get_event_loop().time() }) def handle_response(response): # Update the last request with response info if network_requests and network_requests[-1]['url'] == response.url: network_requests[-1]['status'] = response.status network_requests[-1]['response_headers'] = dict(response.headers) page.on('request', handle_request) page.on('response', handle_response) logger.info("Browser initialized successfully") return page async def close_browser(): """Close the browser if running.""" global browser, page if browser: await browser.close() browser = None page = None logger.info("Browser closed") # === MCP TOOLS === @mcp.tool() async def execute_command(command: str = "") -> str: """Execute a single terminal command securely via SSH on remote Linux server.""" if not command.strip(): return "โŒ Error: Command parameter is required" if not SSH_USER: return "โŒ Error: SSH_USER environment variable not set" try: logger.info(f"Executing command: {command}") client = await get_ssh_client() # Execute command with timeout stdin, stdout, stderr = client.exec_command(command, timeout=30) # Read output output = stdout.read().decode('utf-8', errors='ignore') error = stderr.read().decode('utf-8', errors='ignore') exit_code = stdout.channel.recv_exit_status() client.close() result = f"๐Ÿ“‹ Command: {command}\n" result += f"๐Ÿ“Š Exit Code: {exit_code}\n" if output.strip(): result += f"โœ… Output:\n{output}" if error.strip(): result += f"โš ๏ธ Error Output:\n{error}" if not output.strip() and not error.strip(): result += "โ„น๏ธ Command executed successfully (no output)" return result except Exception as e: logger.error(f"Error executing command: {e}") return f"โŒ Error executing command: {str(e)}" @mcp.tool() async def execute_script(script_path: str = "", args: str = "") -> str: """Execute a bash script on the remote server with optional arguments.""" if not script_path.strip(): return "โŒ Error: Script path parameter is required" try: # Build the command to execute the script command = f"bash {script_path}" if args.strip(): command += f" {args}" logger.info(f"Executing script: {command}") return await execute_command(command) except Exception as e: logger.error(f"Error executing script: {e}") return f"โŒ Error executing script: {str(e)}" @mcp.tool() async def create_bash_script(filename: str = "", content: str = "") -> str: """Create a new bash script on the remote server.""" if not filename.strip(): return "โŒ Error: Filename parameter is required" if not content.strip(): return "โŒ Error: Content parameter is required" try: logger.info(f"Creating script: {filename}") # Ensure the script has a shebang if not content.startswith("#!"): script_content = f"#!/bin/bash\n{content}" else: script_content = content # Write to remote file via SFTP client = await get_ssh_client() sftp = client.open_sftp() with sftp.file(filename, 'w') as f: f.write(script_content) # Make executable client.exec_command(f"chmod +x {filename}") sftp.close() client.close() return f"โœ… Script created successfully: {filename}\n๐Ÿ“„ Content:\n{script_content[:200]}{'...' if len(script_content) > 200 else ''}" except Exception as e: logger.error(f"Error creating script: {e}") return f"โŒ Error creating script: {str(e)}" @mcp.tool() async def list_directory(path: str = ".") -> str: """List directory contents on the remote server with detailed information.""" try: command = f"ls -la {path}" logger.info(f"Listing directory: {path}") result = await execute_command(command) return result except Exception as e: logger.error(f"Error listing directory: {e}") return f"โŒ Error listing directory: {str(e)}" @mcp.tool() async def get_system_info() -> str: """Get comprehensive system information from the remote server.""" if not SSH_USER: return "โŒ Error: SSH_USER environment variable not set" try: logger.info("Getting system information") client = await get_ssh_client() # Get CPU info stdin, stdout, stderr = client.exec_command("top -bn1 | grep 'Cpu(s)' | head -1") cpu_info = stdout.read().decode('utf-8', errors='ignore').strip() # Get memory info stdin, stdout, stderr = client.exec_command("free -h | grep -E '^(Mem|Swap)'") mem_info = stdout.read().decode('utf-8', errors='ignore').strip() # Get disk usage stdin, stdout, stderr = client.exec_command("df -h | head -10") disk_info = stdout.read().decode('utf-8', errors='ignore').strip() # Get system info stdin, stdout, stderr = client.exec_command("uname -a && uptime") sys_info = stdout.read().decode('utf-8', errors='ignore').strip() # Get load average stdin, stdout, stderr = client.exec_command("cat /proc/loadavg") load_avg = stdout.read().decode('utf-8', errors='ignore').strip() client.close() result = "๐Ÿ“Š System Information:\n\n" result += f"๐Ÿ–ฅ๏ธ System: {sys_info}\n\n" result += f"โšก CPU Usage: {cpu_info}\n\n" result += f"๐Ÿง  Memory: \n{mem_info}\n\n" result += f"๐Ÿ’พ Disk Usage: \n{disk_info}\n\n" result += f"๐Ÿ“ˆ Load Average: {load_avg}" return result except Exception as e: logger.error(f"Error getting system info: {e}") return f"โŒ Error getting system info: {str(e)}" @mcp.tool() async def puppeteer_navigate(url: str = "") -> str: """Navigate browser to specified URL.""" if not url.strip(): return "โŒ Error: URL parameter is required" try: logger.info(f"Navigating to: {url}") page = await init_browser() # Navigate with timeout await page.goto(url, {'waitUntil': 'networkidle0', 'timeout': 30000}) title = await page.title() url = page.url return f"โœ… Navigated to: {url}\n๐Ÿ“„ Page Title: {title}" except Exception as e: logger.error(f"Error navigating to URL: {e}") return f"โŒ Error navigating to URL: {str(e)}" @mcp.tool() async def puppeteer_screenshot(name: str = "", selector: str = "", width: str = "800", height: str = "600") -> str: """Capture screenshot of page or specific element.""" if not name.strip(): return "โŒ Error: Name parameter is required" try: logger.info(f"Capturing screenshot: {name}") page = await init_browser() # Convert string parameters try: width_int = int(width) if width.strip() else 800 height_int = int(height) if height.strip() else 600 except ValueError: return "โŒ Error: Width and height must be valid integers" await page.setViewport({'width': width_int, 'height': height_int}) filename = f"{name}.png" if selector.strip(): element = await page.querySelector(selector) if element: await element.screenshot({'path': filename}) return f"โœ… Element screenshot captured: {filename} ({width_int}x{height_int})" else: return f"โŒ Error: Element not found: {selector}" else: await page.screenshot({'path': filename, 'fullPage': True}) return f"โœ… Full page screenshot captured: {filename} ({width_int}x{height_int})" except Exception as e: logger.error(f"Error capturing screenshot: {e}") return f"โŒ Error capturing screenshot: {str(e)}" @mcp.tool() async def puppeteer_click(selector: str = "") -> str: """Click on a web page element.""" if not selector.strip(): return "โŒ Error: Selector parameter is required" try: logger.info(f"Clicking element: {selector}") page = await init_browser() # Wait for element and click await page.waitForSelector(selector, {'timeout': 5000}) await page.click(selector, {'delay': 100}) return f"โœ… Clicked element: {selector}" except Exception as e: logger.error(f"Error clicking element: {e}") return f"โŒ Error clicking element: {str(e)}" @mcp.tool() async def puppeteer_hover(selector: str = "") -> str: """Hover over a web page element.""" if not selector.strip(): return "โŒ Error: Selector parameter is required" try: logger.info(f"Hovering over element: {selector}") page = await init_browser() # Wait for element and hover await page.waitForSelector(selector, {'timeout': 5000}) await page.hover(selector) return f"โœ… Hovered over element: {selector}" except Exception as e: logger.error(f"Error hovering element: {e}") return f"โŒ Error hovering element: {str(e)}" @mcp.tool() async def puppeteer_fill(selector: str = "", value: str = "") -> str: """Fill a form input field.""" if not selector.strip(): return "โŒ Error: Selector parameter is required" if value is None: return "โŒ Error: Value parameter is required" try: logger.info(f"Filling input {selector} with: {value}") page = await init_browser() # Wait for element, clear it, and type await page.waitForSelector(selector, {'timeout': 5000}) await page.evaluate(f"document.querySelector('{selector}').value = ''") await page.type(selector, value, {'delay': 50}) return f"โœ… Filled input {selector} with: {value}" except Exception as e: logger.error(f"Error filling input: {e}") return f"โŒ Error filling input: {str(e)}" @mcp.tool() async def puppeteer_select(selector: str = "", value: str = "") -> str: """Select an option from a dropdown menu.""" if not selector.strip(): return "โŒ Error: Selector parameter is required" if not value: return "โŒ Error: Value parameter is required" try: logger.info(f"Selecting {value} from {selector}") page = await init_browser() # Wait for element and select option await page.waitForSelector(selector, {'timeout': 5000}) await page.select(selector, value) return f"โœ… Selected {value} from {selector}" except Exception as e: logger.error(f"Error selecting option: {e}") return f"โŒ Error selecting option: {str(e)}" @mcp.tool() async def puppeteer_evaluate(script: str = "") -> str: """Execute JavaScript in the browser context.""" if not script.strip(): return "โŒ Error: Script parameter is required" try: logger.info("Executing JavaScript in browser") page = await init_browser() result = await page.evaluate(script) return f"โœ… JavaScript executed successfully\n๐Ÿ“„ Result: {result}" except Exception as e: logger.error(f"Error executing JavaScript: {e}") return f"โŒ Error executing JavaScript: {str(e)}" @mcp.tool() async def puppeteer_get_console_logs() -> str: """Get browser console logs.""" try: global console_logs logger.info("Retrieving console logs") if not console_logs: return "๐Ÿ“‹ Console Logs: No logs captured yet" logs_text = "\n".join(console_logs[-20:]) # Last 20 logs return f"๐Ÿ“‹ Console Logs (last 20):\n{logs_text}" except Exception as e: logger.error(f"Error getting console logs: {e}") return f"โŒ Error getting console logs: {str(e)}" @mcp.tool() async def puppeteer_get_screenshot() -> str: """Get current browser screenshot as base64.""" try: logger.info("Getting browser screenshot") page = await init_browser() screenshot = await page.screenshot({'encoding': 'base64'}) return f"๐Ÿ“ธ Screenshot captured (base64 - first 100 chars): {screenshot[:100]}..." except Exception as e: logger.error(f"Error getting screenshot: {e}") return f"โŒ Error getting screenshot: {str(e)}" @mcp.tool() async def puppeteer_get_network_details() -> str: """Get network request/response details since session started.""" try: global network_requests logger.info("Retrieving network details") if not network_requests: return "๐ŸŒ Network Details: No requests captured yet" # Get last 10 requests recent_requests = network_requests[-10:] details = [] for req in recent_requests: status = req.get('status', 'pending') method = req.get('method', 'UNKNOWN') url = req.get('url', 'UNKNOWN') details.append(f"{method} {url} - {status}") return f"๐ŸŒ Network Details (last 10 requests):\n" + "\n".join(details) except Exception as e: logger.error(f"Error getting network details: {e}") return f"โŒ Error getting network details: {str(e)}" # === MCP RESOURCES === @mcp.resource("console://logs") async def get_console_logs_resource() -> str: """Browser console logs resource.""" try: global console_logs if not console_logs: return "No console logs captured yet" return "\n".join(console_logs) except Exception as e: return f"Error retrieving console logs: {str(e)}" @mcp.resource("screenshot://current") async def get_screenshot_resource() -> str: """Current browser screenshot resource.""" try: page = await init_browser() screenshot = await page.screenshot({'encoding': 'base64'}) return f"data:image/png;base64,{screenshot}" except Exception as e: return f"Error capturing screenshot: {str(e)}" @mcp.resource("network://details") async def get_network_details_resource() -> str: """Network request/response details resource.""" try: global network_requests if not network_requests: return "No network requests captured yet" # Return as JSON for structured data return json.dumps(network_requests[-20:], indent=2) # Last 20 requests except Exception as e: return f"Error retrieving network details: {str(e)}" # === SERVER STARTUP === async def main(): """Main entry point for the MCP server.""" logger.info("Starting Linux MCP server...") # Validate configuration if not SSH_USER: logger.warning("SSH_USER not set - SSH tools will not work") if not SSH_KEY_PATH and not SSH_PASSWORD: logger.warning("Neither SSH_KEY_PATH nor SSH_PASSWORD set - SSH authentication may fail") try: # Start the MCP server await mcp.run_async(transport='stdio') except KeyboardInterrupt: logger.info("Server interrupted by user") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1) finally: # Cleanup await close_browser() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aitesthaj/AWESOME_LINUX_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server