ssh_readonly_fastmcp_mcast.pyā¢11.8 kB
#!/usr/bin/env python3
"""
SSH Read-Only FastMCP Server with Multicast Discovery
A FastMCP implementation for secure SSH connections with read-only command execution.
Install with: pip install fastmcp paramiko python-dotenv
"""
import os
import json
import socket
import uuid
import threading
import time
import logging
from typing import Any, Dict
from dotenv import load_dotenv
import paramiko
from fastmcp import FastMCP
from mcp.types import TextContent
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
filename="ssh_mcp.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger("ssh_mcp")
# --- Configuration ---
transport = os.getenv("MCP_TRANSPORT", "stdio")
server_name = os.getenv("MCP_SERVER_NAME", "SSH Read-Only MCP Server")
enable_broadcast = os.getenv("MCP_ENABLE_BROADCAST", "true").lower() == "true"
broadcast_interval = int(os.getenv("MCP_BROADCAST_INTERVAL", "30"))
# Multicast settings
SSDP_ADDR = "239.255.255.250"
MCP_DISCOVERY_PORT = 5353
# Initialize FastMCP
if transport == "http" or transport == "streamable-http":
host = os.getenv("MCP_HOST", "0.0.0.0")
port = int(os.getenv("MCP_PORT", "3000"))
logger.info(f"Starting MCP server in {transport} mode at {host}:{port}")
mcp = FastMCP(name=server_name, host=host, port=port, debug=True)
else:
mcp = FastMCP(name=server_name)
# --- Multicast Broadcaster ---
class MCPBroadcaster:
"""Broadcasts MCP server presence on multicast"""
def __init__(self, server_name: str, port: int, transport: str):
self.server_name = server_name
self.port = port
self.transport = transport
self.uuid = str(uuid.uuid4())
self.running = False
self.sock = None
def get_local_ip(self):
"""Get local IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return '127.0.0.1'
def create_announcement(self):
"""Create MCP discovery announcement"""
local_ip = self.get_local_ip()
announcement = {
"type": "mcp-announcement",
"protocol": "MCP-DISCOVERY-v1",
"uuid": self.uuid,
"name": self.server_name,
"host": local_ip,
"port": self.port,
"endpoint": "/mcp",
"protocol_type": "MCP-HTTP",
"transport": self.transport,
"version": "1.0.0"
}
return json.dumps(announcement).encode('utf-8')
def start_broadcasting(self):
"""Start broadcasting server presence"""
if not enable_broadcast:
logger.info("Broadcasting disabled")
return
self.running = True
# Create UDP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
logger.info(f"Starting multicast broadcaster on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
logger.info(f"Broadcasting every {broadcast_interval} seconds")
print(f"š» Broadcasting {self.server_name} on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
while self.running:
try:
message = self.create_announcement()
self.sock.sendto(message, (SSDP_ADDR, MCP_DISCOVERY_PORT))
logger.debug(f"Broadcast sent: {message.decode('utf-8')}")
except Exception as e:
logger.error(f"Broadcast error: {e}")
# Sleep in small intervals so shutdown is responsive
for _ in range(broadcast_interval * 10):
if not self.running:
break
time.sleep(0.1)
def stop_broadcasting(self):
"""Stop broadcasting"""
self.running = False
if self.sock:
self.sock.close()
logger.info("Broadcasting stopped")
# List of allowed read-only commands
ALLOWED_COMMANDS = {
'cat', 'ls', 'pwd', 'whoami', 'id', 'date', 'uptime', 'ps', 'top',
'df', 'du', 'free', 'netstat', 'ss', 'ifconfig', 'ip', 'hostname',
'uname', 'lsb_release', 'file', 'head', 'tail', 'wc', 'grep',
'find', 'locate', 'which', 'whereis', 'stat', 'lsof', 'mount',
'dmidecode', 'lscpu', 'lsblk', 'fdisk', 'blkid', 'journalctl',
'systemctl', 'service', 'curl', 'wget', 'dig', 'nslookup', 'ping',
'traceroute', 'mtr', 'iptables', 'firewall-cmd', 'ufw', 'awk', 'sed'
}
# Global SSH client storage
ssh_clients = {}
def is_command_safe(command: str) -> bool:
"""Verify that the command is read-only safe."""
cmd_parts = command.strip().split()
if not cmd_parts:
return False
base_cmd = cmd_parts[0].split('/')[-1] # Get the command name without path
# Block dangerous patterns
dangerous_patterns = [
'>', '<', '|', '&', ';', '$(', '`', 'rm', 'dd', 'mkfs',
'chmod', 'chown', 'mv', 'cp', 'touch', 'mkdir', 'rmdir',
'kill', 'shutdown', 'reboot', 'halt', 'poweroff',
'su', 'sudo', 'passwd', 'usermod', 'useradd', 'userdel',
'nano', 'vi', 'vim', 'ed'
]
# Check for dangerous patterns in command
for pattern in dangerous_patterns:
if pattern in command:
return False
# Check if base command is in allowed list
return base_cmd in ALLOWED_COMMANDS
@mcp.tool()
def ssh_connect(
host: str,
username: str,
port: int = 22,
key_filename: str = None,
password: str = None
) -> str:
"""
Establish SSH connection to a remote machine (read-only access only).
Args:
host: Remote host IP address or hostname
username: SSH username
port: SSH port (default: 22)
key_filename: Path to private key file (recommended)
password: SSH password (fallback if no key)
Returns:
Connection status message
"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if key_filename:
client.connect(host, port=port, username=username,
key_filename=key_filename, timeout=10)
elif password:
client.connect(host, port=port, username=username,
password=password, timeout=10)
else:
raise ValueError("Either key_filename or password must be provided")
# Store client with connection identifier
connection_id = f"{username}@{host}:{port}"
ssh_clients[connection_id] = {
'client': client,
'host': host,
'username': username,
'port': port
}
return f"Successfully connected to {connection_id}\nRead-only access enabled."
except Exception as e:
return f"Connection failed: {str(e)}"
@mcp.tool()
def ssh_execute(
host: str,
username: str,
command: str,
port: int = 22
) -> str:
"""
Execute a read-only command on the connected remote machine.
Args:
host: Remote host (must be connected first)
username: SSH username
command: The read-only command to execute (e.g., 'ls -la', 'cat /etc/hostname')
port: SSH port (default: 22)
Returns:
Command output or error message
"""
connection_id = f"{username}@{host}:{port}"
# Check if connected
if connection_id not in ssh_clients:
return f"Error: Not connected to {connection_id}. Please connect first using ssh_connect."
# Validate command safety
if not is_command_safe(command):
return f"Error: Command not allowed for security reasons.\nOnly read-only commands are permitted.\nBlocked command: {command}"
try:
client = ssh_clients[connection_id]['client']
stdin, stdout, stderr = client.exec_command(command, timeout=30)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
if exit_code == 0:
return output if output else "(command executed successfully with no output)"
else:
return f"Command failed with exit code {exit_code}\nError: {error if error else 'No error message'}"
except paramiko.SSHException as e:
return f"SSH error: {str(e)}"
except Exception as e:
return f"Command execution failed: {str(e)}"
@mcp.tool()
def ssh_disconnect(
host: str,
username: str,
port: int = 22
) -> str:
"""
Disconnect from the remote machine.
Args:
host: Remote host
username: SSH username
port: SSH port (default: 22)
Returns:
Disconnection status message
"""
connection_id = f"{username}@{host}:{port}"
if connection_id in ssh_clients:
try:
ssh_clients[connection_id]['client'].close()
del ssh_clients[connection_id]
return f"Successfully disconnected from {connection_id}"
except Exception as e:
return f"Disconnection error: {str(e)}"
else:
return f"Not connected to {connection_id}"
@mcp.tool()
def ssh_list_connections() -> str:
"""
List all active SSH connections.
Returns:
List of active connections
"""
if not ssh_clients:
return "No active connections"
connections = []
for conn_id in ssh_clients.keys():
connections.append(f" ⢠{conn_id}")
return "Active SSH Connections:\n" + "\n".join(connections)
@mcp.tool()
def ssh_get_allowed_commands() -> str:
"""
Get the list of allowed read-only commands.
Returns:
List of allowed commands
"""
sorted_commands = sorted(ALLOWED_COMMANDS)
return "Allowed Read-Only Commands:\n" + ", ".join(sorted_commands)
if __name__ == "__main__":
logger.info(f"Starting MCP server with transport={transport}")
print("=" * 60)
print(f"Starting {server_name}")
print("=" * 60)
print(f"Transport: {transport}")
if transport in ["http", "streamable-http"]:
print(f"Host: 0.0.0.0")
print(f"Port: {os.getenv('MCP_PORT', '3000')}")
print(f"Endpoint: http://localhost:{os.getenv('MCP_PORT', '3000')}/mcp")
print(f"Broadcasting: {'ENABLED' if enable_broadcast else 'DISABLED'}")
if enable_broadcast:
print(f"Broadcast interval: {broadcast_interval}s")
print(f"Multicast: {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
print("=" * 60)
print(f"š Total allowed commands: {len(ALLOWED_COMMANDS)}")
print("ā
Available tools:")
print(" ⢠ssh_connect - Connect to a remote machine")
print(" ⢠ssh_execute - Execute read-only commands")
print(" ⢠ssh_disconnect - Disconnect from a machine")
print(" ⢠ssh_list_connections - View active connections")
print(" ⢠ssh_get_allowed_commands - View allowed commands")
print("\nš Read-only mode enforced - write operations blocked")
print("=" * 60)
# Start broadcaster if in HTTP mode
broadcaster = None
if transport in ["http", "streamable-http"] and enable_broadcast:
broadcaster = MCPBroadcaster(server_name, int(os.getenv("MCP_PORT", "3000")), transport)
broadcaster_thread = threading.Thread(target=broadcaster.start_broadcasting, daemon=True)
broadcaster_thread.start()
try:
mcp.run(transport=transport)
finally:
if broadcaster:
broadcaster.stop_broadcasting()