from __future__ import annotations
import json
import os
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator
from .models import ArtifactMetadata, DebuggerSession, VmDefinition, VmRuntime
from .util import LabError, ensure_dir, load_json, save_json, windows_local_appdata
class Workspace:
def __init__(self, root: Path | None = None):
self.root = root or windows_local_appdata() / "mcp-qemu-lab"
self.base_images_dir = self.root / "base-images"
self.vms_dir = self.root / "vms"
self.artifacts_dir = self.root / "artifacts"
self.logs_dir = self.root / "logs"
self.manifests_dir = self.root / "manifests"
self.state_dir = self.root / "state"
self.keys_dir = self.root / "keys"
self.audit_path = self.logs_dir / "audit.jsonl"
self.dep_manifest = self.manifests_dir / "dependencies.json"
self.vm_defs_path = self.state_dir / "vm_definitions.json"
self.vm_runtime_path = self.state_dir / "vm_runtime.json"
self.vm_runtime_lock_path = self.state_dir / "vm_runtime.lock"
self.artifact_index_path = self.state_dir / "artifacts.json"
self.debug_sessions_path = self.state_dir / "debug_sessions.json"
def initialize(self) -> None:
ensure_dir(self.root)
ensure_dir(self.base_images_dir)
ensure_dir(self.vms_dir)
ensure_dir(self.artifacts_dir)
ensure_dir(self.logs_dir)
ensure_dir(self.manifests_dir)
ensure_dir(self.state_dir)
ensure_dir(self.keys_dir)
if not self.audit_path.exists():
self.audit_path.write_text("", encoding="utf-8")
def vm_dir(self, vm_id: str) -> Path:
return ensure_dir(self.vms_dir / vm_id)
def vm_artifacts_dir(self, vm_id: str) -> Path:
return ensure_dir(self.artifacts_dir / vm_id)
def write_audit_entry(self, entry: dict[str, Any]) -> None:
with self.audit_path.open("a", encoding="utf-8") as fp:
fp.write(json.dumps(entry, sort_keys=True) + "\n")
def load_vm_definitions(self) -> dict[str, VmDefinition]:
raw = load_json(self.vm_defs_path, default={})
return {k: VmDefinition.model_validate(v) for k, v in raw.items()}
def save_vm_definitions(self, defs: dict[str, VmDefinition]) -> None:
save_json(self.vm_defs_path, {k: v.model_dump() for k, v in defs.items()})
def load_vm_runtime(self) -> dict[str, VmRuntime]:
raw = load_json(self.vm_runtime_path, default={})
return {k: VmRuntime.model_validate(v) for k, v in raw.items()}
def save_vm_runtime(self, runtime: dict[str, VmRuntime]) -> None:
save_json(self.vm_runtime_path, {k: v.model_dump() for k, v in runtime.items()})
@contextmanager
def edit_vm_runtime(
self,
*,
lock_timeout_sec: float = 20.0,
stale_lock_sec: float = 300.0,
) -> Iterator[dict[str, VmRuntime]]:
with self._lock_file(
self.vm_runtime_lock_path,
timeout_sec=lock_timeout_sec,
stale_after_sec=stale_lock_sec,
):
runtime = self.load_vm_runtime()
yield runtime
self.save_vm_runtime(runtime)
def load_artifacts(self) -> dict[str, ArtifactMetadata]:
raw = load_json(self.artifact_index_path, default={})
return {k: ArtifactMetadata.model_validate(v) for k, v in raw.items()}
def save_artifacts(self, items: dict[str, ArtifactMetadata]) -> None:
save_json(self.artifact_index_path, {k: v.model_dump() for k, v in items.items()})
def load_debug_sessions(self) -> dict[str, DebuggerSession]:
raw = load_json(self.debug_sessions_path, default={})
return {k: DebuggerSession.model_validate(v) for k, v in raw.items()}
def save_debug_sessions(self, sessions: dict[str, DebuggerSession]) -> None:
save_json(
self.debug_sessions_path,
{k: s.model_dump() for k, s in sessions.items()},
)
@contextmanager
def _lock_file(
self,
lock_path: Path,
*,
timeout_sec: float,
stale_after_sec: float,
poll_sec: float = 0.1,
) -> Iterator[None]:
deadline = time.time() + timeout_sec
lock_fd: int | None = None
while lock_fd is None:
try:
lock_fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except FileExistsError:
if stale_after_sec > 0 and lock_path.exists():
try:
age = time.time() - lock_path.stat().st_mtime
if age >= stale_after_sec:
lock_path.unlink(missing_ok=True)
continue
except OSError:
pass
if time.time() >= deadline:
raise LabError(
code="state_lock_timeout",
message=f"Timed out waiting for state lock: {lock_path.name}",
details={"path": str(lock_path), "timeout_sec": timeout_sec},
)
time.sleep(poll_sec)
try:
os.write(lock_fd, f"{os.getpid()} {time.time()}\n".encode("utf-8"))
yield
finally:
try:
os.close(lock_fd)
except OSError:
pass
try:
lock_path.unlink(missing_ok=True)
except OSError:
pass