ssh_readonly_fastmcp.pyā¢6.94 kB
#!/usr/bin/env python3
"""
SSH Read-Only FastMCP Server
A FastMCP implementation for secure SSH connections with read-only command execution.
Install with: pip install fastmcp paramiko
"""
import paramiko
from fastmcp import FastMCP
from mcp.types import TextContent
# Initialize FastMCP
mcp = FastMCP("ssh-readonly")
# List of allowed read-only commands
ALLOWED_COMMANDS = {
'cat', 'ls', 'pwd', 'whoami', 'id', 'date', 'uptime', 'ps', 'top',
'df', 'du', 'free', 'netstat', 'ss', 'ifconfig', 'ip', 'hostname',
'uname', 'lsb_release', 'file', 'head', 'tail', 'wc', 'grep',
'find', 'locate', 'which', 'whereis', 'stat', 'lsof', 'mount',
'dmidecode', 'lscpu', 'lsblk', 'fdisk', 'blkid', 'journalctl',
'systemctl', 'service', 'curl', 'wget', 'dig', 'nslookup', 'ping',
'traceroute', 'mtr', 'iptables', 'firewall-cmd', 'ufw', 'awk', 'sed'
}
# Global SSH client storage
ssh_clients = {}
def is_command_safe(command: str) -> bool:
"""Verify that the command is read-only safe."""
cmd_parts = command.strip().split()
if not cmd_parts:
return False
base_cmd = cmd_parts[0].split('/')[-1] # Get the command name without path
# Block dangerous patterns
dangerous_patterns = [
'>', '<', '|', '&', ';', '$(', '`', 'rm', 'dd', 'mkfs',
'chmod', 'chown', 'mv', 'cp', 'touch', 'mkdir', 'rmdir',
'kill', 'shutdown', 'reboot', 'halt', 'poweroff',
'su', 'sudo', 'passwd', 'usermod', 'useradd', 'userdel',
'nano', 'vi', 'vim', 'ed'
]
# Check for dangerous patterns in command
for pattern in dangerous_patterns:
if pattern in command:
return False
# Check if base command is in allowed list
return base_cmd in ALLOWED_COMMANDS
@mcp.tool()
def ssh_connect(
host: str,
username: str,
port: int = 22,
key_filename: str = None,
password: str = None
) -> str:
"""
Establish SSH connection to a remote machine (read-only access only).
Args:
host: Remote host IP address or hostname
username: SSH username
port: SSH port (default: 22)
key_filename: Path to private key file (recommended)
password: SSH password (fallback if no key)
Returns:
Connection status message
"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if key_filename:
client.connect(host, port=port, username=username,
key_filename=key_filename, timeout=10)
elif password:
client.connect(host, port=port, username=username,
password=password, timeout=10)
else:
raise ValueError("Either key_filename or password must be provided")
# Store client with connection identifier
connection_id = f"{username}@{host}:{port}"
ssh_clients[connection_id] = {
'client': client,
'host': host,
'username': username,
'port': port
}
return f"Successfully connected to {connection_id}\nRead-only access enabled."
except Exception as e:
return f"Connection failed: {str(e)}"
@mcp.tool()
def ssh_execute(
host: str,
username: str,
command: str,
port: int = 22
) -> str:
"""
Execute a read-only command on the connected remote machine.
Args:
host: Remote host (must be connected first)
username: SSH username
command: The read-only command to execute (e.g., 'ls -la', 'cat /etc/hostname')
port: SSH port (default: 22)
Returns:
Command output or error message
"""
connection_id = f"{username}@{host}:{port}"
# Check if connected
if connection_id not in ssh_clients:
return f"Error: Not connected to {connection_id}. Please connect first using ssh_connect."
# Validate command safety
if not is_command_safe(command):
return f"Error: Command not allowed for security reasons.\nOnly read-only commands are permitted.\nBlocked command: {command}"
try:
client = ssh_clients[connection_id]['client']
stdin, stdout, stderr = client.exec_command(command, timeout=30)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
if exit_code == 0:
return output if output else "(command executed successfully with no output)"
else:
return f"Command failed with exit code {exit_code}\nError: {error if error else 'No error message'}"
except paramiko.SSHException as e:
return f"SSH error: {str(e)}"
except Exception as e:
return f"Command execution failed: {str(e)}"
@mcp.tool()
def ssh_disconnect(
host: str,
username: str,
port: int = 22
) -> str:
"""
Disconnect from the remote machine.
Args:
host: Remote host
username: SSH username
port: SSH port (default: 22)
Returns:
Disconnection status message
"""
connection_id = f"{username}@{host}:{port}"
if connection_id in ssh_clients:
try:
ssh_clients[connection_id]['client'].close()
del ssh_clients[connection_id]
return f"Successfully disconnected from {connection_id}"
except Exception as e:
return f"Disconnection error: {str(e)}"
else:
return f"Not connected to {connection_id}"
@mcp.tool()
def ssh_list_connections() -> str:
"""
List all active SSH connections.
Returns:
List of active connections
"""
if not ssh_clients:
return "No active connections"
connections = []
for conn_id in ssh_clients.keys():
connections.append(f" ⢠{conn_id}")
return "Active SSH Connections:\n" + "\n".join(connections)
@mcp.tool()
def ssh_get_allowed_commands() -> str:
"""
Get the list of allowed read-only commands.
Returns:
List of allowed commands
"""
sorted_commands = sorted(ALLOWED_COMMANDS)
return "Allowed Read-Only Commands:\n" + ", ".join(sorted_commands)
if __name__ == "__main__":
print("š SSH Read-Only FastMCP Server Starting...")
print(f"š Total allowed commands: {len(ALLOWED_COMMANDS)}")
print("ā
Available tools:")
print(" ⢠ssh_connect - Connect to a remote machine")
print(" ⢠ssh_execute - Execute read-only commands")
print(" ⢠ssh_disconnect - Disconnect from a machine")
print(" ⢠ssh_list_connections - View active connections")
print(" ⢠ssh_get_allowed_commands - View allowed commands")
print("\nš Read-only mode enforced - write operations blocked")
print("\nStarting FastMCP server...")
mcp.run()