"""Core subprocess helpers for OpenVSP/VSPAero automation."""
from __future__ import annotations
import os
import subprocess
import tempfile
from pathlib import Path
from shutil import which
from typing import Iterable
from .models import OpenVSPRequest, OpenVSPResponse, VSPCommand
def _resolve_binary(env_var: str, default_name: str) -> str:
candidate = os.environ.get(env_var)
if candidate:
return candidate
resolved = which(default_name)
if resolved:
return resolved
return default_name
OPENVSP_BIN = _resolve_binary("OPENVSP_BIN", "vsp")
VSPAERO_BIN = _resolve_binary("VSPAERO_BIN", "vspaero")
def execute_openvsp(request: OpenVSPRequest) -> OpenVSPResponse:
"""Run OpenVSP (and optionally VSPAero) using the provided request."""
with tempfile.TemporaryDirectory(prefix="openvsp_mcp_") as tmpdir:
workdir = Path(tmpdir)
script_path = _write_script(request, workdir)
try:
result = subprocess.run(
[OPENVSP_BIN, "-script", str(script_path)],
check=False,
capture_output=True,
)
except FileNotFoundError as exc: # pragma: no cover
raise RuntimeError("OpenVSP binary not found") from exc
if result.returncode not in _OK_EXIT_CODES:
message = result.stderr.decode("utf-8", errors="ignore").strip()
if not message:
message = result.stdout.decode("utf-8", errors="ignore").strip() or "OpenVSP script execution failed"
raise RuntimeError(message)
vspaero_output: str | None = None
if request.run_vspaero:
try:
aero = subprocess.run(
[VSPAERO_BIN, request.geometry_file, request.case_name],
check=False,
capture_output=True,
)
except FileNotFoundError as exc: # pragma: no cover
raise RuntimeError("VSPAero binary not found") from exc
if aero.returncode != 0:
message = aero.stderr.decode("utf-8", errors="ignore").strip()
if not message:
message = aero.stdout.decode("utf-8", errors="ignore").strip() or "VSPAero execution failed"
raise RuntimeError(message)
vspaero_output = str(Path(request.case_name).with_suffix(".adb"))
return OpenVSPResponse(script_path=str(script_path), result_path=vspaero_output)
def _ensure_statement(command: str) -> str:
stripped = command.strip()
if not stripped.endswith(";"):
stripped = f"{stripped};"
return stripped
def _write_script(request: OpenVSPRequest, working_dir: Path) -> Path:
script_path = working_dir / "automation.vspscript"
commands: Iterable[VSPCommand] = request.set_commands or []
script_lines = [
"// Auto-generated by openvsp-mcp",
"void main() {",
" ClearVSPModel();",
f" ReadVSPFile(\"{request.geometry_file}\");",
]
for command in commands:
script_lines.append(f" {_ensure_statement(command.command)}")
script_lines.extend(
[
" Update();",
f" SetVSP3FileName(\"{request.geometry_file}\");",
f" WriteVSPFile(\"{request.geometry_file}\", SET_ALL);",
"}",
]
)
script_path.write_text("\n".join(script_lines) + "\n", encoding="utf-8")
return script_path
_OK_EXIT_CODES = {0, 160, 224}
__all__ = ["execute_openvsp", "OPENVSP_BIN", "VSPAERO_BIN", "_OK_EXIT_CODES", "_write_script"]