"""Wrappers around common macOS terminal utilities for FastMCP tools."""
from __future__ import annotations
import json
import platform
import shutil
import subprocess
from typing import Iterable, Sequence
from fastmcp.exceptions import ToolError
_COMMAND_TIMEOUT = 30
_SYSTEM_PROFILER_TIMEOUT = 120
_ALLOWED_PROFILES = {
"software": "SPSoftwareDataType",
"hardware": "SPHardwareDataType",
"network": "SPNetworkDataType",
"power": "SPPowerDataType",
}
def _ensure_macos() -> None:
if platform.system() != "Darwin":
raise ToolError("macOS-only tool invoked on a non-macOS host")
def _resolve_executable(executable: str) -> str:
path = shutil.which(executable)
if path is None:
raise ToolError(f"Required executable {executable!r} is not available on PATH")
return path
def _run_command(parts: Sequence[str], *, timeout: int = _COMMAND_TIMEOUT) -> str:
"""Run a command and return its stdout, raising ToolError on failure."""
if not parts:
raise ToolError("No command provided")
_ensure_macos()
resolved = [_resolve_executable(parts[0]), *parts[1:]]
try:
completed = subprocess.run(
resolved,
check=True,
capture_output=True,
text=True,
timeout=timeout,
)
except FileNotFoundError as exc:
raise ToolError(f"Executable {parts[0]!r} is not installed") from exc
except subprocess.TimeoutExpired as exc:
raise ToolError(
f"Command {parts[0]!r} timed out after {timeout} seconds"
) from exc
except subprocess.CalledProcessError as exc:
message = exc.stderr.strip() or exc.stdout.strip() or str(exc)
raise ToolError(
f"Command {parts[0]!r} exited with status {exc.returncode}: {message}"
) from exc
return completed.stdout.strip()
def diskutil_list() -> str:
"""Return the output of ``diskutil list`` for disk overview."""
return _run_command(["diskutil", "list"])
def battery_status() -> str:
"""Return the battery summary from ``pmset -g batt``."""
return _run_command(["pmset", "-g", "batt"])
def network_services() -> str:
"""Return all configured network services via ``networksetup``."""
return _run_command(["networksetup", "-listallnetworkservices"])
def network_service_details(service_name: str) -> str:
"""Return interface details for a given network service."""
if not service_name.strip():
raise ToolError("Network service name must not be empty")
return _run_command(["networksetup", "-getinfo", service_name])
def system_profile(data_type: str = "software") -> dict:
"""Return structured JSON from ``system_profiler`` for a supported data type."""
normalized = data_type.strip().lower()
profiler_type = _ALLOWED_PROFILES.get(normalized, data_type.strip())
if profiler_type not in _ALLOWED_PROFILES.values():
supported = ", ".join(sorted(_ALLOWED_PROFILES))
raise ToolError(
f"Unsupported system_profiler data type {data_type!r}. Supported aliases: {supported}"
)
raw_output = _run_command(
["system_profiler", profiler_type, "-json"],
timeout=_SYSTEM_PROFILER_TIMEOUT,
)
try:
return json.loads(raw_output)
except json.JSONDecodeError as exc:
raise ToolError("Failed to parse system_profiler JSON output") from exc
def available_network_services() -> Iterable[str]:
"""Helper that returns the list of network services, skipping blank lines."""
output = network_services()
for line in output.splitlines():
line = line.strip()
if not line or line.startswith("An asterisk"):
continue
sanitized = line.lstrip('* ').strip()
if sanitized:
yield sanitized