We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/bobbyhiddn/Sympathy-MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""SSH transport for remote command execution and file operations.
All VM/host interactions go over SSH. This replaces the previous incus exec
approach, enabling cross-machine operation (Pi cluster, remote nodes).
Uses asyncio.create_subprocess_exec with the ssh/scp CLI — no paramiko,
keeping it simple like Ansible does.
"""
from __future__ import annotations
import asyncio
import base64
import os
import re
import tempfile
from dataclasses import dataclass
from pathlib import Path
from sympathy_mcp.config import HostConfig, SympathyConfig
# ---------------------------------------------------------------------------
# Result types
# ---------------------------------------------------------------------------
@dataclass
class ExecResult:
"""Result of running a command on a remote host."""
stdout: str
stderr: str
exit_code: int
# ---------------------------------------------------------------------------
# Validation helpers
# ---------------------------------------------------------------------------
# VM/host names: alphanumeric, hyphens, dots (for hostnames)
_VALID_NAME = re.compile(r"^[a-zA-Z][a-zA-Z0-9.\-]*$")
def _validate_host_name(name: str) -> None:
"""Validate that a host name is safe."""
if not name or not name.strip():
raise ValueError("Host name cannot be empty")
if not _VALID_NAME.match(name):
raise ValueError(
f"Invalid host name '{name}': must start with a letter, "
"contain only alphanumeric characters, hyphens, and dots"
)
def _validate_path(path: str) -> None:
"""Validate a file path on the remote host (basic safety checks)."""
if not path or not path.strip():
raise ValueError("Path cannot be empty")
if "\x00" in path:
raise ValueError("Path cannot contain null bytes")
if not path.startswith("/"):
raise ValueError(f"Path must be absolute (start with /): '{path}'")
def _validate_local_path(path: str) -> None:
"""Validate a local file path on the host."""
if not path or not path.strip():
raise ValueError("Local path cannot be empty")
if "\x00" in path:
raise ValueError("Path cannot contain null bytes")
# ---------------------------------------------------------------------------
# SSH transport
# ---------------------------------------------------------------------------
async def _run_ssh(
host_config: HostConfig,
remote_command: list[str],
timeout: int = 120,
stdin_data: bytes | None = None,
) -> ExecResult:
"""Run a command on a remote host via SSH.
Args:
host_config: SSH connection details for the target host.
remote_command: Command and arguments to run remotely.
timeout: Maximum seconds to wait.
stdin_data: Optional bytes to pipe to stdin.
Returns:
ExecResult with stdout, stderr, and exit code.
Raises:
TimeoutError: If the command exceeds the timeout.
OSError: If the ssh binary is not found.
"""
ssh_args = host_config.ssh_args()
full_cmd = ["ssh"] + ssh_args + remote_command
proc = await asyncio.create_subprocess_exec(
*full_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(input=stdin_data),
timeout=timeout,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise TimeoutError(
f"SSH command timed out after {timeout}s on {host_config.name}"
)
return ExecResult(
stdout=stdout_bytes.decode("utf-8", errors="replace"),
stderr=stderr_bytes.decode("utf-8", errors="replace"),
exit_code=proc.returncode or 0,
)
async def _run_scp(
host_config: HostConfig,
args: list[str],
timeout: int = 120,
) -> ExecResult:
"""Run an SCP command.
Args:
host_config: SSH connection details for the target host.
args: SCP arguments (source and destination).
timeout: Maximum seconds to wait.
Returns:
ExecResult with stdout, stderr, and exit code.
"""
scp_args = host_config.scp_args()
full_cmd = ["scp"] + scp_args + args
proc = await asyncio.create_subprocess_exec(
*full_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise TimeoutError(
f"SCP command timed out after {timeout}s on {host_config.name}"
)
return ExecResult(
stdout=stdout_bytes.decode("utf-8", errors="replace"),
stderr=stderr_bytes.decode("utf-8", errors="replace"),
exit_code=proc.returncode or 0,
)
# ---------------------------------------------------------------------------
# Config-aware wrappers (load config once, look up by name)
# ---------------------------------------------------------------------------
# Module-level config singleton — loaded once on first use
_config: SympathyConfig | None = None
def _get_config() -> SympathyConfig:
"""Get or load the config singleton."""
global _config
if _config is None:
_config = SympathyConfig.load()
return _config
def reload_config() -> None:
"""Force reload of the config file."""
global _config
_config = SympathyConfig.load()
def _resolve_host(vm: str) -> HostConfig:
"""Resolve a VM/host name to its SSH config."""
_validate_host_name(vm)
config = _get_config()
return config.get_host(vm)
# ---------------------------------------------------------------------------
# Public API — matches the original incus.py interface
# ---------------------------------------------------------------------------
async def vm_exec(
vm: str,
command: str,
workdir: str = "/",
timeout: int = 120,
) -> ExecResult:
"""Execute a command on a remote host via SSH.
Uses `ssh user@host sh -c <command>` to run the command in a shell
so pipes, redirects, etc. work as expected.
Args:
vm: Name of the host (as configured in hosts.toml).
command: Shell command to execute.
workdir: Working directory on the remote host (default: /).
timeout: Maximum seconds to wait (default: 120).
Returns:
ExecResult with stdout, stderr, and exit code.
"""
host = _resolve_host(vm)
if not command or not command.strip():
raise ValueError("Command cannot be empty")
# Wrap the command with cd to workdir.
# SSH concatenates remote args into a single string, so we pass
# the entire wrapped command as one argument to avoid splitting.
wrapped = f"cd {workdir} && {command}"
return await _run_ssh(
host,
[wrapped],
timeout=timeout,
)
async def vm_file_read(vm: str, path: str) -> str:
"""Read a file from a remote host via SSH.
Uses `ssh user@host cat <path>`.
Args:
vm: Name of the host (as configured in hosts.toml).
path: Absolute path to the file on the remote host.
Returns:
File contents as a string.
Raises:
RuntimeError: If the file cannot be read.
"""
host = _resolve_host(vm)
_validate_path(path)
result = await _run_ssh(host, ["cat", path])
if result.exit_code != 0:
raise RuntimeError(
f"Failed to read {path} from {vm}: {result.stderr.strip()}"
)
return result.stdout
async def vm_file_write(vm: str, path: str, content: str) -> ExecResult:
"""Write content to a file on a remote host.
Pipes content via stdin to `ssh user@host tee <path>` for safe transfer
of arbitrary content (no shell escaping issues).
Args:
vm: Name of the host (as configured in hosts.toml).
path: Absolute path where the file should be written.
content: The content to write.
Returns:
ExecResult with operation details.
"""
host = _resolve_host(vm)
_validate_path(path)
# Use ssh with stdin piped to tee — handles arbitrary content safely
# Redirect tee's stdout to /dev/null so we don't echo the content back
result = await _run_ssh(
host,
["tee", path],
stdin_data=content.encode("utf-8"),
)
# tee echoes stdin to stdout, so clear that from the result
return ExecResult(
stdout="",
stderr=result.stderr,
exit_code=result.exit_code,
)
async def vm_file_push(
vm: str,
local_path: str,
remote_path: str,
) -> ExecResult:
"""Push a file from the local host to a remote host via SCP.
Args:
vm: Name of the host (as configured in hosts.toml).
local_path: Path to the file on the local host.
remote_path: Absolute destination path on the remote host.
Returns:
ExecResult with operation details.
"""
host = _resolve_host(vm)
_validate_local_path(local_path)
_validate_path(remote_path)
if not Path(local_path).is_file():
raise FileNotFoundError(f"Local file not found: {local_path}")
return await _run_scp(
host,
[local_path, f"{host.scp_prefix}{remote_path}"],
)
async def vm_file_pull(
vm: str,
remote_path: str,
local_path: str,
) -> ExecResult:
"""Pull a file from a remote host to the local host via SCP.
Args:
vm: Name of the host (as configured in hosts.toml).
remote_path: Absolute path to the file on the remote host.
local_path: Destination path on the local host.
Returns:
ExecResult with operation details.
"""
host = _resolve_host(vm)
_validate_path(remote_path)
_validate_local_path(local_path)
local_dir = Path(local_path).parent
local_dir.mkdir(parents=True, exist_ok=True)
return await _run_scp(
host,
[f"{host.scp_prefix}{remote_path}", local_path],
)