Skip to main content
Glama

SSH Read-Only MCP Server

by kunwarmahen
ssh_readonly_fastmcp_mcast.py•11.8 kB
#!/usr/bin/env python3 """ SSH Read-Only FastMCP Server with Multicast Discovery A FastMCP implementation for secure SSH connections with read-only command execution. Install with: pip install fastmcp paramiko python-dotenv """ import os import json import socket import uuid import threading import time import logging from typing import Any, Dict from dotenv import load_dotenv import paramiko from fastmcp import FastMCP from mcp.types import TextContent # Load environment variables load_dotenv() # Configure logging logging.basicConfig( filename="ssh_mcp.log", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger("ssh_mcp") # --- Configuration --- transport = os.getenv("MCP_TRANSPORT", "stdio") server_name = os.getenv("MCP_SERVER_NAME", "SSH Read-Only MCP Server") enable_broadcast = os.getenv("MCP_ENABLE_BROADCAST", "true").lower() == "true" broadcast_interval = int(os.getenv("MCP_BROADCAST_INTERVAL", "30")) # Multicast settings SSDP_ADDR = "239.255.255.250" MCP_DISCOVERY_PORT = 5353 # Initialize FastMCP if transport == "http" or transport == "streamable-http": host = os.getenv("MCP_HOST", "0.0.0.0") port = int(os.getenv("MCP_PORT", "3000")) logger.info(f"Starting MCP server in {transport} mode at {host}:{port}") mcp = FastMCP(name=server_name, host=host, port=port, debug=True) else: mcp = FastMCP(name=server_name) # --- Multicast Broadcaster --- class MCPBroadcaster: """Broadcasts MCP server presence on multicast""" def __init__(self, server_name: str, port: int, transport: str): self.server_name = server_name self.port = port self.transport = transport self.uuid = str(uuid.uuid4()) self.running = False self.sock = None def get_local_ip(self): """Get local IP address""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] s.close() return ip except: return '127.0.0.1' def create_announcement(self): """Create MCP discovery announcement""" local_ip = self.get_local_ip() announcement = { "type": "mcp-announcement", "protocol": "MCP-DISCOVERY-v1", "uuid": self.uuid, "name": self.server_name, "host": local_ip, "port": self.port, "endpoint": "/mcp", "protocol_type": "MCP-HTTP", "transport": self.transport, "version": "1.0.0" } return json.dumps(announcement).encode('utf-8') def start_broadcasting(self): """Start broadcasting server presence""" if not enable_broadcast: logger.info("Broadcasting disabled") return self.running = True # Create UDP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) logger.info(f"Starting multicast broadcaster on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}") logger.info(f"Broadcasting every {broadcast_interval} seconds") print(f"šŸ“» Broadcasting {self.server_name} on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}") while self.running: try: message = self.create_announcement() self.sock.sendto(message, (SSDP_ADDR, MCP_DISCOVERY_PORT)) logger.debug(f"Broadcast sent: {message.decode('utf-8')}") except Exception as e: logger.error(f"Broadcast error: {e}") # Sleep in small intervals so shutdown is responsive for _ in range(broadcast_interval * 10): if not self.running: break time.sleep(0.1) def stop_broadcasting(self): """Stop broadcasting""" self.running = False if self.sock: self.sock.close() logger.info("Broadcasting stopped") # List of allowed read-only commands ALLOWED_COMMANDS = { 'cat', 'ls', 'pwd', 'whoami', 'id', 'date', 'uptime', 'ps', 'top', 'df', 'du', 'free', 'netstat', 'ss', 'ifconfig', 'ip', 'hostname', 'uname', 'lsb_release', 'file', 'head', 'tail', 'wc', 'grep', 'find', 'locate', 'which', 'whereis', 'stat', 'lsof', 'mount', 'dmidecode', 'lscpu', 'lsblk', 'fdisk', 'blkid', 'journalctl', 'systemctl', 'service', 'curl', 'wget', 'dig', 'nslookup', 'ping', 'traceroute', 'mtr', 'iptables', 'firewall-cmd', 'ufw', 'awk', 'sed' } # Global SSH client storage ssh_clients = {} def is_command_safe(command: str) -> bool: """Verify that the command is read-only safe.""" cmd_parts = command.strip().split() if not cmd_parts: return False base_cmd = cmd_parts[0].split('/')[-1] # Get the command name without path # Block dangerous patterns dangerous_patterns = [ '>', '<', '|', '&', ';', '$(', '`', 'rm', 'dd', 'mkfs', 'chmod', 'chown', 'mv', 'cp', 'touch', 'mkdir', 'rmdir', 'kill', 'shutdown', 'reboot', 'halt', 'poweroff', 'su', 'sudo', 'passwd', 'usermod', 'useradd', 'userdel', 'nano', 'vi', 'vim', 'ed' ] # Check for dangerous patterns in command for pattern in dangerous_patterns: if pattern in command: return False # Check if base command is in allowed list return base_cmd in ALLOWED_COMMANDS @mcp.tool() def ssh_connect( host: str, username: str, port: int = 22, key_filename: str = None, password: str = None ) -> str: """ Establish SSH connection to a remote machine (read-only access only). Args: host: Remote host IP address or hostname username: SSH username port: SSH port (default: 22) key_filename: Path to private key file (recommended) password: SSH password (fallback if no key) Returns: Connection status message """ try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if key_filename: client.connect(host, port=port, username=username, key_filename=key_filename, timeout=10) elif password: client.connect(host, port=port, username=username, password=password, timeout=10) else: raise ValueError("Either key_filename or password must be provided") # Store client with connection identifier connection_id = f"{username}@{host}:{port}" ssh_clients[connection_id] = { 'client': client, 'host': host, 'username': username, 'port': port } return f"Successfully connected to {connection_id}\nRead-only access enabled." except Exception as e: return f"Connection failed: {str(e)}" @mcp.tool() def ssh_execute( host: str, username: str, command: str, port: int = 22 ) -> str: """ Execute a read-only command on the connected remote machine. Args: host: Remote host (must be connected first) username: SSH username command: The read-only command to execute (e.g., 'ls -la', 'cat /etc/hostname') port: SSH port (default: 22) Returns: Command output or error message """ connection_id = f"{username}@{host}:{port}" # Check if connected if connection_id not in ssh_clients: return f"Error: Not connected to {connection_id}. Please connect first using ssh_connect." # Validate command safety if not is_command_safe(command): return f"Error: Command not allowed for security reasons.\nOnly read-only commands are permitted.\nBlocked command: {command}" try: client = ssh_clients[connection_id]['client'] stdin, stdout, stderr = client.exec_command(command, timeout=30) output = stdout.read().decode('utf-8') error = stderr.read().decode('utf-8') exit_code = stdout.channel.recv_exit_status() if exit_code == 0: return output if output else "(command executed successfully with no output)" else: return f"Command failed with exit code {exit_code}\nError: {error if error else 'No error message'}" except paramiko.SSHException as e: return f"SSH error: {str(e)}" except Exception as e: return f"Command execution failed: {str(e)}" @mcp.tool() def ssh_disconnect( host: str, username: str, port: int = 22 ) -> str: """ Disconnect from the remote machine. Args: host: Remote host username: SSH username port: SSH port (default: 22) Returns: Disconnection status message """ connection_id = f"{username}@{host}:{port}" if connection_id in ssh_clients: try: ssh_clients[connection_id]['client'].close() del ssh_clients[connection_id] return f"Successfully disconnected from {connection_id}" except Exception as e: return f"Disconnection error: {str(e)}" else: return f"Not connected to {connection_id}" @mcp.tool() def ssh_list_connections() -> str: """ List all active SSH connections. Returns: List of active connections """ if not ssh_clients: return "No active connections" connections = [] for conn_id in ssh_clients.keys(): connections.append(f" • {conn_id}") return "Active SSH Connections:\n" + "\n".join(connections) @mcp.tool() def ssh_get_allowed_commands() -> str: """ Get the list of allowed read-only commands. Returns: List of allowed commands """ sorted_commands = sorted(ALLOWED_COMMANDS) return "Allowed Read-Only Commands:\n" + ", ".join(sorted_commands) if __name__ == "__main__": logger.info(f"Starting MCP server with transport={transport}") print("=" * 60) print(f"Starting {server_name}") print("=" * 60) print(f"Transport: {transport}") if transport in ["http", "streamable-http"]: print(f"Host: 0.0.0.0") print(f"Port: {os.getenv('MCP_PORT', '3000')}") print(f"Endpoint: http://localhost:{os.getenv('MCP_PORT', '3000')}/mcp") print(f"Broadcasting: {'ENABLED' if enable_broadcast else 'DISABLED'}") if enable_broadcast: print(f"Broadcast interval: {broadcast_interval}s") print(f"Multicast: {SSDP_ADDR}:{MCP_DISCOVERY_PORT}") print("=" * 60) print(f"šŸ“ Total allowed commands: {len(ALLOWED_COMMANDS)}") print("āœ… Available tools:") print(" • ssh_connect - Connect to a remote machine") print(" • ssh_execute - Execute read-only commands") print(" • ssh_disconnect - Disconnect from a machine") print(" • ssh_list_connections - View active connections") print(" • ssh_get_allowed_commands - View allowed commands") print("\nšŸ”’ Read-only mode enforced - write operations blocked") print("=" * 60) # Start broadcaster if in HTTP mode broadcaster = None if transport in ["http", "streamable-http"] and enable_broadcast: broadcaster = MCPBroadcaster(server_name, int(os.getenv("MCP_PORT", "3000")), transport) broadcaster_thread = threading.Thread(target=broadcaster.start_broadcasting, daemon=True) broadcaster_thread.start() try: mcp.run(transport=transport) finally: if broadcaster: broadcaster.stop_broadcasting()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarmahen/ssh-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server