from __future__ import annotations
import importlib.metadata
import os
import platform
import threading
from pathlib import Path
from .models import DependencyRecord, DependencyStatus, EnsureDependenciesResult
from .util import LabError, find_executable, run_command_no_fail, save_json
from .workspace import Workspace
class DependencyBootstrap:
def __init__(self, workspace: Workspace):
self.workspace = workspace
self._lock = threading.Lock()
self._ensured = False
self.paths: dict[str, str] = {}
def ensure(self) -> EnsureDependenciesResult:
with self._lock:
if self._ensured and self.workspace.dep_manifest.exists():
cached = self._load_cached_result()
if cached:
return cached
self.workspace.initialize()
dependencies: list[DependencyRecord] = []
dependencies.append(self._ensure_qemu())
dependencies.append(self._ensure_qemu_img())
dependencies.extend(self._ensure_ssh_tools())
dependencies.append(self._ensure_iso_builder())
dependencies.append(self._python_dependency("fastmcp"))
dependencies.append(self._python_dependency("pycdlib"))
dependencies.append(self._python_dependency("requests"))
if any(item.status == DependencyStatus.MISSING for item in dependencies):
missing = [d.name for d in dependencies if d.status == DependencyStatus.MISSING]
raise LabError(
code="dependency_missing",
message="One or more dependencies could not be installed automatically.",
details={"missing_dependencies": missing},
)
result = EnsureDependenciesResult(
workspace=str(self.workspace.root),
manifest_path=str(self.workspace.dep_manifest),
dependencies=dependencies,
)
manifest_payload = {
"checked_at": result.checked_at,
"os": platform.platform(),
"python": platform.python_version(),
"dependencies": [item.model_dump() for item in dependencies],
"paths": self.paths,
}
save_json(self.workspace.dep_manifest, manifest_payload)
self._ensured = True
return result
def _load_cached_result(self) -> EnsureDependenciesResult | None:
try:
import json
payload = json.loads(self.workspace.dep_manifest.read_text(encoding="utf-8"))
deps = [DependencyRecord.model_validate(item) for item in payload.get("dependencies", [])]
return EnsureDependenciesResult(
workspace=str(self.workspace.root),
manifest_path=str(self.workspace.dep_manifest),
checked_at=payload.get("checked_at"),
dependencies=deps,
)
except Exception:
return None
def _ensure_qemu(self) -> DependencyRecord:
qemu = find_executable(["qemu-system-x86_64.exe", "qemu-system-x86_64"])
if not qemu and os.name == "nt":
self._attempt_winget_install("QEMU.QEMU")
qemu = find_executable(["qemu-system-x86_64.exe", "qemu-system-x86_64"])
if qemu:
status = DependencyStatus.INSTALLED
else:
return DependencyRecord(
name="qemu-system-x86_64",
status=DependencyStatus.MISSING,
details="Install QEMU manually or run with privileges that allow winget user-scope install.",
)
elif not qemu:
return DependencyRecord(
name="qemu-system-x86_64",
status=DependencyStatus.MISSING,
details="qemu-system-x86_64 is not on PATH.",
)
else:
status = DependencyStatus.PRESENT
version = self._command_version([qemu, "--version"])
self.paths["qemu-system-x86_64"] = qemu
return DependencyRecord(name="qemu-system-x86_64", status=status, path=qemu, version=version)
def _ensure_qemu_img(self) -> DependencyRecord:
qemu_img = find_executable(["qemu-img.exe", "qemu-img"])
if not qemu_img and os.name == "nt":
self._attempt_winget_install("QEMU.QEMU")
qemu_img = find_executable(["qemu-img.exe", "qemu-img"])
if qemu_img:
status = DependencyStatus.INSTALLED
else:
return DependencyRecord(
name="qemu-img",
status=DependencyStatus.MISSING,
details="Install QEMU manually or run with privileges that allow winget user-scope install.",
)
elif not qemu_img:
return DependencyRecord(name="qemu-img", status=DependencyStatus.MISSING, details="qemu-img is not on PATH.")
else:
status = DependencyStatus.PRESENT
version = self._command_version([qemu_img, "--version"])
self.paths["qemu-img"] = qemu_img
return DependencyRecord(name="qemu-img", status=status, path=qemu_img, version=version)
def _ensure_ssh_tools(self) -> list[DependencyRecord]:
records: list[DependencyRecord] = []
for tool_name in ("ssh", "scp", "ssh-keygen"):
exe = find_executable([f"{tool_name}.exe", tool_name])
status = DependencyStatus.PRESENT if exe else DependencyStatus.MISSING
if not exe and os.name == "nt":
self._attempt_install_openssh_client()
exe = find_executable([f"{tool_name}.exe", tool_name])
status = DependencyStatus.INSTALLED if exe else DependencyStatus.MISSING
version = None
if exe:
self.paths[tool_name] = exe
if tool_name == "ssh":
version = self._command_version([exe, "-V"], use_stderr=True)
records.append(
DependencyRecord(
name=tool_name,
status=status,
path=exe,
version=version,
details=None
if status != DependencyStatus.MISSING
else "OpenSSH Client is not installed and could not be auto-installed.",
)
)
return records
def _ensure_iso_builder(self) -> DependencyRecord:
try:
version = importlib.metadata.version("pycdlib")
return DependencyRecord(
name="pycdlib_iso_builder",
status=DependencyStatus.PRESENT,
path="python-package:pycdlib",
version=version,
details="Using pycdlib as cloud-init ISO creator.",
)
except importlib.metadata.PackageNotFoundError:
return DependencyRecord(
name="pycdlib_iso_builder",
status=DependencyStatus.MISSING,
details="pycdlib package is missing.",
)
def _python_dependency(self, name: str) -> DependencyRecord:
try:
version = importlib.metadata.version(name)
return DependencyRecord(
name=f"python:{name}",
status=DependencyStatus.PRESENT,
path=f"python-package:{name}",
version=version,
)
except importlib.metadata.PackageNotFoundError:
return DependencyRecord(
name=f"python:{name}",
status=DependencyStatus.MISSING,
details=f"Python package {name} is not installed.",
)
def _attempt_winget_install(self, package_id: str) -> None:
winget = find_executable(["winget.exe", "winget"])
if not winget:
return
command = [
winget,
"install",
"--id",
package_id,
"--silent",
"--accept-package-agreements",
"--accept-source-agreements",
"--scope",
"user",
]
result = run_command_no_fail(command, timeout=600)
if result.returncode != 0:
if "Administrator" in result.stderr or "admin" in result.stderr.lower():
raise LabError(
code="dependency_privilege_required",
message=f"Unable to auto-install {package_id} without elevated privileges.",
details={"stderr": result.stderr, "stdout": result.stdout},
)
def _attempt_install_openssh_client(self) -> None:
if os.name != "nt":
return
cmd = [
"powershell.exe",
"-NoProfile",
"-NonInteractive",
"-Command",
"Add-WindowsCapability -Online -Name OpenSSH.Client~~~~0.0.1.0",
]
result = run_command_no_fail(cmd, timeout=600)
if result.returncode != 0 and (
"Access is denied" in result.stderr
or "administrator" in result.stderr.lower()
or "elevation" in result.stderr.lower()
):
raise LabError(
code="dependency_privilege_required",
message="Unable to install OpenSSH Client without administrator privileges.",
details={"stderr": result.stderr, "stdout": result.stdout},
)
def _command_version(self, args: list[str], use_stderr: bool = False) -> str | None:
result = run_command_no_fail(args)
output = result.stderr if use_stderr else result.stdout
output = output.strip()
if not output:
output = result.stdout.strip() or result.stderr.strip()
if not output:
return None
return output.splitlines()[0].strip()