from __future__ import annotations
import io
import logging
import os
import re
import subprocess
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import Any, Iterator
from urllib.parse import urlparse
import pycdlib
import requests
from .bootstrap import DependencyBootstrap
from .guest import GuestConnection, build_allowlisted_command, run_ssh_command, scp_copy_in, scp_copy_out, shell_join
from .models import (
ArtifactMetadata,
DebuggerSession,
GuestExecResult,
NetMode,
ProcessInfo,
ProcessMapEntry,
StopMode,
VmArch,
VmDefinition,
VmRuntime,
VmStatusResult,
path_str,
utc_now_iso,
)
from .qmp import QmpClient, QmpEndpoint
from .util import (
LabError,
acquire_free_tcp_port,
compute_sha256,
ensure_dir,
find_executable,
require,
run_command,
)
from .workspace import Workspace
DEFAULT_UBUNTU_IMAGE = "https://cloud-images.ubuntu.com/noble/current/noble-server-cloudimg-amd64.img"
logger = logging.getLogger(__name__)
def _is_pid_running(pid: int) -> bool:
if pid <= 0:
return False
if os.name == "nt":
result = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=15,
check=False,
)
return str(pid) in result.stdout
try:
os.kill(pid, 0)
return True
except OSError:
return False
def _slug(value: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9_-]+", "-", value.strip())
return cleaned.strip("-").lower() or "vm"
def _parse_maps(text: str) -> list[ProcessMapEntry]:
entries: list[ProcessMapEntry] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=5)
if len(parts) < 5:
continue
addr, perms, offset, dev, inode = parts[:5]
path = parts[5] if len(parts) > 5 else None
if "-" not in addr:
continue
start, end = addr.split("-", 1)
try:
inode_value = int(inode)
except ValueError:
inode_value = 0
entries.append(
ProcessMapEntry(
start=start,
end=end,
perms=perms,
offset=offset,
dev=dev,
inode=inode_value,
path=path,
)
)
return entries
def _tail_lines(path: Path, line_count: int) -> list[str]:
if line_count <= 0:
return []
if not path.exists():
return []
with path.open("r", encoding="utf-8", errors="replace") as fp:
lines = fp.readlines()
return [line.rstrip("\r\n") for line in lines[-line_count:]]
@dataclass
class AuditContext:
tool: str
entry: dict[str, Any]
class QemuLab:
def __init__(self, workspace_root: Path | None = None):
self.workspace = Workspace(workspace_root)
self.workspace.initialize()
self.bootstrap = DependencyBootstrap(self.workspace)
self.vm_definitions = self.workspace.load_vm_definitions()
self.vm_runtime = self.workspace.load_vm_runtime()
self.artifacts = self.workspace.load_artifacts()
self.debug_sessions = self.workspace.load_debug_sessions()
self._cleanup_dead_runtime_entries()
self._ensure_known_hosts_file()
def ensure_dependencies(self):
with self._audit("ensure_dependencies") as ctx:
result = self.bootstrap.ensure()
ctx.entry["details"] = {"manifest_path": result.manifest_path}
return result
def vm_create(
self,
*,
name: str,
arch: VmArch = VmArch.X86_64,
cpu: int = 2,
mem_mb: int = 2048,
base_image_source: str = DEFAULT_UBUNTU_IMAGE,
net_mode: NetMode = NetMode.NONE,
enable_qmp: bool = True,
base_image_download_timeout_sec: int = 1800,
) -> dict[str, Any]:
with self._audit("vm_create") as ctx:
require(enable_qmp, "qmp_required", "enable_qmp must be true for this server implementation.")
self.bootstrap.ensure()
require(arch == VmArch.X86_64, "unsupported_arch", "Only x86_64 is currently supported.", arch=arch.value)
require(cpu > 0, "invalid_cpu", "cpu must be >= 1.", cpu=cpu)
require(mem_mb >= 256, "invalid_mem", "mem_mb must be >= 256.", mem_mb=mem_mb)
require(
base_image_download_timeout_sec > 0,
"invalid_timeout",
"base_image_download_timeout_sec must be > 0.",
base_image_download_timeout_sec=base_image_download_timeout_sec,
)
vm_id = f"vm-{_slug(name)}-{uuid.uuid4().hex[:8]}"
vm_dir = self.workspace.vm_dir(vm_id)
base_image_path = self._resolve_base_image(
base_image_source,
timeout_sec=float(base_image_download_timeout_sec),
)
overlay_path = vm_dir / "disk-overlay.qcow2"
self._create_overlay(base_image_path, overlay_path)
key_private, key_public = self._ensure_ssh_keypair()
seed_iso_path = vm_dir / "seed.iso"
self._create_cloud_init_seed(
vm_id=vm_id,
vm_name=name,
seed_iso_path=seed_iso_path,
ssh_pubkey=key_public.read_text(encoding="utf-8").strip(),
)
vm_def = VmDefinition(
vm_id=vm_id,
name=name,
arch=arch,
cpu=cpu,
mem_mb=mem_mb,
base_image_source=base_image_source,
base_image_path=path_str(base_image_path),
overlay_path=path_str(overlay_path),
seed_iso_path=path_str(seed_iso_path),
net_mode=net_mode,
ssh_private_key_path=path_str(key_private),
ssh_public_key_path=path_str(key_public),
)
self.vm_definitions[vm_id] = vm_def
self.workspace.save_vm_definitions(self.vm_definitions)
ctx.entry["vm_id"] = vm_id
ctx.entry["details"] = {
"net_mode": net_mode.value,
"arch": arch.value,
"overlay_path": vm_def.overlay_path,
"seed_iso_path": vm_def.seed_iso_path,
}
return {
"vm_id": vm_def.vm_id,
"name": vm_def.name,
"arch": vm_def.arch.value,
"cpu": vm_def.cpu,
"mem_mb": vm_def.mem_mb,
"net_mode": vm_def.net_mode.value,
"base_image_path": vm_def.base_image_path,
"overlay_path": vm_def.overlay_path,
"seed_iso_path": vm_def.seed_iso_path,
}
def vm_start(self, vm_id: str, qmp_wait_timeout_sec: int = 120, qmp_command_timeout_sec: int = 15) -> VmStatusResult:
with self._audit("vm_start", vm_id=vm_id) as ctx:
qmp_wait_timeout = self._require_timeout("qmp_wait_timeout_sec", qmp_wait_timeout_sec, min_value=1.0)
qmp_command_timeout = self._require_timeout("qmp_command_timeout_sec", qmp_command_timeout_sec, min_value=1.0)
logger.info(
"vm_start vm_id=%s qmp_wait_timeout_sec=%s qmp_command_timeout_sec=%s",
vm_id,
qmp_wait_timeout,
qmp_command_timeout,
)
self.bootstrap.ensure()
vm_def = self._require_vm(vm_id)
self._refresh_vm_runtime()
runtime = self.vm_runtime.get(vm_id)
if runtime and _is_pid_running(runtime.pid):
status = self._qmp_status(runtime, timeout_sec=qmp_command_timeout)
return VmStatusResult(
vm_id=vm_id,
status=status,
qmp_endpoint=f"tcp://{runtime.qmp_host}:{runtime.qmp_port}",
ssh_endpoint=self._format_ssh_endpoint(runtime),
pid=runtime.pid,
)
if runtime:
self._remove_runtime(vm_id)
qemu_exe = self._qemu_binary_for_arch(vm_def.arch)
qmp_port = acquire_free_tcp_port()
ssh_port = acquire_free_tcp_port() if vm_def.net_mode == NetMode.USER else None
log_path = self.workspace.logs_dir / f"{vm_id}-qemu.log"
serial_log_path = self.workspace.logs_dir / f"{vm_id}-serial.log"
debug_log_path = self.workspace.logs_dir / f"{vm_id}-qemu-debug.log"
log_file = log_path.open("a", encoding="utf-8")
args = [
qemu_exe,
"-name",
vm_def.name,
"-machine",
"accel=tcg",
"-smp",
str(vm_def.cpu),
"-m",
str(vm_def.mem_mb),
"-display",
"none",
"-serial",
f"file:{serial_log_path.as_posix()}",
"-monitor",
"none",
"-D",
str(debug_log_path),
"-d",
"guest_errors,unimp",
"-qmp",
f"tcp:127.0.0.1:{qmp_port},server=on,wait=off",
"-drive",
f"if=virtio,format=qcow2,file={vm_def.overlay_path}",
"-drive",
f"if=virtio,media=cdrom,format=raw,file={vm_def.seed_iso_path}",
]
if vm_def.net_mode == NetMode.NONE:
args.extend(["-net", "none"])
else:
args.extend(["-nic", f"user,model=virtio-net-pci,hostfwd=tcp::{ssh_port}-:22"])
process = subprocess.Popen(
args,
stdout=log_file,
stderr=log_file,
stdin=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0,
)
runtime = VmRuntime(
vm_id=vm_id,
pid=process.pid,
qmp_port=qmp_port,
ssh_host="127.0.0.1" if ssh_port else None,
ssh_port=ssh_port,
log_path=path_str(log_path),
serial_log_path=path_str(serial_log_path),
debug_log_path=path_str(debug_log_path),
)
self._upsert_runtime(runtime)
self._wait_for_qmp(runtime, timeout_sec=qmp_wait_timeout, qmp_timeout_sec=qmp_command_timeout)
status = self._qmp_status(runtime, timeout_sec=qmp_command_timeout)
ctx.entry["details"] = {
"pid": process.pid,
"qmp_endpoint": f"tcp://{runtime.qmp_host}:{runtime.qmp_port}",
"ssh_endpoint": self._format_ssh_endpoint(runtime),
"log_path": runtime.log_path,
"serial_log_path": runtime.serial_log_path,
"debug_log_path": runtime.debug_log_path,
}
return VmStatusResult(
vm_id=vm_id,
status=status,
qmp_endpoint=f"tcp://{runtime.qmp_host}:{runtime.qmp_port}",
ssh_endpoint=self._format_ssh_endpoint(runtime),
pid=process.pid,
)
def vm_stop(
self,
vm_id: str,
mode: StopMode = StopMode.GRACEFUL,
graceful_timeout_sec: int = 45,
force_timeout_sec: int = 15,
qmp_command_timeout_sec: int = 10,
) -> VmStatusResult:
with self._audit("vm_stop", vm_id=vm_id) as ctx:
graceful_timeout = self._require_timeout("graceful_timeout_sec", graceful_timeout_sec, min_value=1.0)
force_timeout = self._require_timeout("force_timeout_sec", force_timeout_sec, min_value=1.0)
qmp_command_timeout = self._require_timeout("qmp_command_timeout_sec", qmp_command_timeout_sec, min_value=1.0)
logger.info(
"vm_stop vm_id=%s mode=%s graceful_timeout_sec=%s force_timeout_sec=%s",
vm_id,
mode.value,
graceful_timeout,
force_timeout,
)
runtime = self._require_runtime(vm_id)
status = "unknown"
if mode == StopMode.GRACEFUL:
try:
with self._qmp(runtime, timeout_sec=qmp_command_timeout) as qmp:
qmp.execute("system_powerdown")
except Exception:
pass
for _ in range(max(1, int(graceful_timeout))):
if not _is_pid_running(runtime.pid):
break
time.sleep(1.0)
if mode == StopMode.FORCE or _is_pid_running(runtime.pid):
try:
with self._qmp(runtime, timeout_sec=qmp_command_timeout) as qmp:
qmp.execute("quit")
except Exception:
pass
force_loops = max(1, int(force_timeout / 0.5))
for _ in range(force_loops):
if not _is_pid_running(runtime.pid):
break
time.sleep(0.5)
if _is_pid_running(runtime.pid) and mode == StopMode.FORCE:
if os.name == "nt":
subprocess.run(
["taskkill", "/PID", str(runtime.pid), "/F"],
capture_output=True,
text=True,
timeout=15,
check=False,
)
else:
try:
os.kill(runtime.pid, 9)
except OSError:
pass
if _is_pid_running(runtime.pid):
raise LabError(
code="vm_stop_failed",
message="VM is still running after stop request.",
details={"vm_id": vm_id, "pid": runtime.pid, "mode": mode.value},
)
status = "stopped"
self._remove_runtime(vm_id)
ctx.entry["details"] = {"final_status": status}
return VmStatusResult(vm_id=vm_id, status=status, pid=None, qmp_endpoint=None, ssh_endpoint=None)
def vm_status(self, vm_id: str, qmp_command_timeout_sec: int = 10) -> dict[str, Any]:
with self._audit("vm_status", vm_id=vm_id) as ctx:
qmp_command_timeout = self._require_timeout("qmp_command_timeout_sec", qmp_command_timeout_sec, min_value=1.0)
vm_def = self._require_vm(vm_id)
self._refresh_vm_runtime()
runtime = self.vm_runtime.get(vm_id)
if runtime and _is_pid_running(runtime.pid):
status = self._qmp_status(runtime, timeout_sec=qmp_command_timeout)
payload = {
"vm_id": vm_id,
"name": vm_def.name,
"status": status,
"pid": runtime.pid,
"qmp_endpoint": f"tcp://{runtime.qmp_host}:{runtime.qmp_port}",
"ssh_endpoint": self._format_ssh_endpoint(runtime),
"log_path": runtime.log_path,
"serial_log_path": runtime.serial_log_path,
"debug_log_path": runtime.debug_log_path,
}
ctx.entry["details"] = payload
return payload
if runtime:
self._remove_runtime(vm_id)
payload = {
"vm_id": vm_id,
"name": vm_def.name,
"status": "stopped",
"pid": None,
"qmp_endpoint": None,
"ssh_endpoint": None,
"log_path": None,
"serial_log_path": None,
"debug_log_path": None,
}
ctx.entry["details"] = payload
return payload
def vm_logs_tail(
self,
vm_id: str,
lines: int = 200,
include_serial: bool = True,
include_debug: bool = True,
include_audit_lines: int = 50,
) -> dict[str, Any]:
with self._audit("vm_logs_tail", vm_id=vm_id) as ctx:
_ = self._require_vm(vm_id)
self._refresh_vm_runtime()
runtime = self.vm_runtime.get(vm_id)
qemu_log_path = Path(runtime.log_path) if runtime else self.workspace.logs_dir / f"{vm_id}-qemu.log"
serial_log_path = (
Path(runtime.serial_log_path)
if runtime and runtime.serial_log_path
else self.workspace.logs_dir / f"{vm_id}-serial.log"
)
debug_log_path = (
Path(runtime.debug_log_path)
if runtime and runtime.debug_log_path
else self.workspace.logs_dir / f"{vm_id}-qemu-debug.log"
)
server_log_path = self.workspace.logs_dir / "server.log"
audit_tail: list[str] = []
if include_audit_lines > 0 and self.workspace.audit_path.exists():
all_audit = _tail_lines(self.workspace.audit_path, max(lines, include_audit_lines) * 5)
for line in all_audit:
if f'"vm_id": "{vm_id}"' in line:
audit_tail.append(line)
audit_tail = audit_tail[-include_audit_lines:]
payload = {
"vm_id": vm_id,
"qemu_log_path": str(qemu_log_path),
"qemu_log_tail": _tail_lines(qemu_log_path, lines),
"serial_log_path": str(serial_log_path) if include_serial else None,
"serial_log_tail": _tail_lines(serial_log_path, lines) if include_serial else [],
"debug_log_path": str(debug_log_path) if include_debug else None,
"debug_log_tail": _tail_lines(debug_log_path, lines) if include_debug else [],
"server_log_path": str(server_log_path),
"server_log_tail": _tail_lines(server_log_path, lines),
"audit_tail": audit_tail,
}
ctx.entry["details"] = {
"lines": lines,
"qemu_log_lines": len(payload["qemu_log_tail"]),
"serial_log_lines": len(payload["serial_log_tail"]),
"debug_log_lines": len(payload["debug_log_tail"]),
"server_log_lines": len(payload["server_log_tail"]),
"audit_lines": len(payload["audit_tail"]),
}
return payload
def guest_wait_ready(
self,
vm_id: str,
timeout_sec: int = 900,
poll_interval_sec: int = 5,
wait_for_cloud_init: bool = True,
require_gdb: bool = True,
) -> dict[str, Any]:
with self._audit("guest_wait_ready", vm_id=vm_id) as ctx:
total_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
poll_interval = self._require_timeout("poll_interval_sec", poll_interval_sec, min_value=0.2)
conn = self._guest_connection(vm_id)
deadline = time.time() + total_timeout
ssh_info = self._wait_for_guest_ssh_ready(conn, timeout_sec=total_timeout, poll_sec=poll_interval)
cloud_init: dict[str, Any] | None = None
if wait_for_cloud_init:
remaining = max(1.0, deadline - time.time())
cloud_init = self._wait_for_cloud_init(conn, timeout_sec=remaining)
checks: dict[str, bool] = {}
if require_gdb:
remaining = max(1.0, deadline - time.time())
self._ensure_guest_command(conn, "gdb", timeout_sec=remaining)
checks["gdb"] = True
payload = {
"vm_id": vm_id,
"ssh_ready": True,
"ssh_attempts": ssh_info["attempts"],
"wait_for_cloud_init": wait_for_cloud_init,
"cloud_init": cloud_init,
"require_gdb": require_gdb,
"checks": checks,
}
ctx.entry["details"] = payload
logger.info(
"guest_wait_ready vm_id=%s ssh_attempts=%s cloud_init=%s checks=%s",
vm_id,
payload["ssh_attempts"],
cloud_init["status"] if cloud_init else "skipped",
checks,
)
return payload
def vm_snapshot_save(self, vm_id: str, snapshot_name: str, timeout_sec: int = 120) -> dict[str, Any]:
with self._audit("vm_snapshot_save", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
runtime = self._require_runtime(vm_id)
require(bool(snapshot_name.strip()), "invalid_snapshot_name", "snapshot_name cannot be empty.")
with self._qmp(runtime, timeout_sec=command_timeout) as qmp:
qmp.human_monitor_command(f"savevm {snapshot_name}")
ctx.entry["details"] = {"snapshot_name": snapshot_name}
return {"vm_id": vm_id, "snapshot_name": snapshot_name, "status": "saved"}
def vm_snapshot_load(self, vm_id: str, snapshot_name: str, timeout_sec: int = 120) -> dict[str, Any]:
with self._audit("vm_snapshot_load", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
runtime = self._require_runtime(vm_id)
require(bool(snapshot_name.strip()), "invalid_snapshot_name", "snapshot_name cannot be empty.")
with self._qmp(runtime, timeout_sec=command_timeout) as qmp:
qmp.human_monitor_command(f"loadvm {snapshot_name}")
ctx.entry["details"] = {"snapshot_name": snapshot_name}
return {"vm_id": vm_id, "snapshot_name": snapshot_name, "status": "loaded"}
def guest_exec(
self,
vm_id: str,
allowed_command: str,
args: list[str] | None = None,
unsafe_allow_arbitrary_commands: bool = False,
unsafe_command: str | None = None,
timeout_sec: int = 120,
) -> GuestExecResult:
with self._audit("guest_exec", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
conn = self._guest_connection(vm_id)
args = args or []
if unsafe_allow_arbitrary_commands:
require(
bool(unsafe_command and unsafe_command.strip()),
"unsafe_command_required",
"unsafe_command must be provided when unsafe_allow_arbitrary_commands=true.",
)
command_args = ["sh", "-lc", unsafe_command or ""]
else:
command_args = build_allowlisted_command(allowed_command, args)
result = run_ssh_command(conn, command_args, timeout=command_timeout)
result.vm_id = vm_id
ctx.entry["command"] = shell_join(command_args)
ctx.entry["details"] = {
"exit_code": result.exit_code,
"timed_out": result.timed_out,
"stdout_preview": result.stdout[:200],
"stderr_preview": result.stderr[:200],
"timeout_sec": command_timeout,
}
logger.debug(
"guest_exec vm_id=%s command=%s exit_code=%s timed_out=%s",
vm_id,
ctx.entry["command"],
result.exit_code,
result.timed_out,
)
return result
def guest_copy_out(
self,
vm_id: str,
guest_path: str,
host_subdir: str = "copied",
timeout_sec: int = 180,
) -> dict[str, Any]:
with self._audit("guest_copy_out", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
conn = self._guest_connection(vm_id)
require(guest_path.startswith("/"), "invalid_guest_path", "guest_path must be an absolute path.")
safe_subdir = _slug(host_subdir)
target_dir = ensure_dir(self.workspace.vm_artifacts_dir(vm_id) / safe_subdir)
filename = PurePosixPath(guest_path).name or "guest_file.bin"
host_path = target_dir / filename
scp_copy_out(conn, guest_path=guest_path, host_path=str(host_path), recursive=False, timeout=command_timeout)
artifact = self._register_artifact(
path=host_path,
vm_id=vm_id,
label=f"guest_copy_out:{filename}",
provenance={"tool": "guest_copy_out", "guest_path": guest_path},
)
ctx.entry["artifacts"].append(artifact.model_dump())
return {"vm_id": vm_id, "guest_path": guest_path, "artifact": artifact.model_dump()}
def guest_copy_in(
self,
vm_id: str,
host_path: str,
guest_path: str,
timeout_sec: int = 180,
) -> dict[str, Any]:
with self._audit("guest_copy_in", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
conn = self._guest_connection(vm_id)
src_path = Path(host_path)
require(src_path.exists(), "host_file_not_found", "host_path does not exist.", host_path=host_path)
require(src_path.is_file(), "host_path_not_file", "host_path must reference a file.", host_path=host_path)
require(guest_path.startswith("/"), "invalid_guest_path", "guest_path must be an absolute path.")
guest_parent = PurePosixPath(guest_path).parent.as_posix() or "/"
mkdir_cmd = ["mkdir", "-p", guest_parent]
mkdir_result = run_ssh_command(conn, mkdir_cmd, timeout=30.0)
if mkdir_result.exit_code != 0:
raise LabError(
code="guest_prepare_path_failed",
message="Failed to create destination directory in guest.",
details={"guest_path": guest_path, "stderr": mkdir_result.stderr, "stdout": mkdir_result.stdout},
)
scp_copy_in(
conn,
host_path=str(src_path.resolve()),
guest_path=guest_path,
recursive=False,
timeout=command_timeout,
)
stat_result = run_ssh_command(conn, ["sha256sum", guest_path], timeout=30.0)
guest_sha256 = None
if stat_result.exit_code == 0:
parts = stat_result.stdout.strip().split()
if parts:
guest_sha256 = parts[0]
payload = {
"vm_id": vm_id,
"host_path": str(src_path.resolve()),
"guest_path": guest_path,
"size": src_path.stat().st_size,
"sha256": compute_sha256(src_path),
"guest_sha256": guest_sha256,
}
ctx.entry["details"] = payload
return payload
def process_list(self, vm_id: str, filter: str | None = None, timeout_sec: int = 30) -> dict[str, Any]:
with self._audit("process_list", vm_id=vm_id) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
conn = self._guest_connection(vm_id)
result = run_ssh_command(conn, ["ps", "-eo", "pid,user,args", "--no-headers"], timeout=command_timeout)
if result.exit_code != 0:
raise LabError(
code="guest_command_failed",
message="Failed to list guest processes.",
details={"stderr": result.stderr, "stdout": result.stdout},
)
processes: list[ProcessInfo] = []
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"^\s*([0-9]+)\s+(\S+)\s+(.*)$", line)
if not m:
continue
proc = ProcessInfo(pid=int(m.group(1)), user=m.group(2), cmdline=m.group(3))
if filter and filter not in proc.cmdline:
continue
processes.append(proc)
ctx.entry["details"] = {"count": len(processes)}
return {"vm_id": vm_id, "count": len(processes), "processes": [p.model_dump() for p in processes]}
def process_maps(self, vm_id: str, pid: int, timeout_sec: int = 30) -> dict[str, Any]:
with self._audit("process_maps", vm_id=vm_id, pid=pid) as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
conn = self._guest_connection(vm_id)
result = run_ssh_command(conn, ["cat", f"/proc/{pid}/maps"], timeout=command_timeout)
if result.exit_code != 0:
raise LabError(
code="process_maps_failed",
message="Failed to read /proc/<pid>/maps.",
details={"pid": pid, "stderr": result.stderr, "stdout": result.stdout},
)
maps = _parse_maps(result.stdout)
ctx.entry["details"] = {"regions": len(maps)}
return {"vm_id": vm_id, "pid": pid, "maps": [m.model_dump() for m in maps]}
def process_dump_core(
self,
vm_id: str,
pid: int,
reason_label: str | None = None,
gdb_timeout_sec: int = 600,
copy_timeout_sec: int = 300,
) -> dict[str, Any]:
with self._audit("process_dump_core", vm_id=vm_id, pid=pid) as ctx:
gdb_timeout = self._require_timeout("gdb_timeout_sec", gdb_timeout_sec, min_value=1.0)
copy_timeout = self._require_timeout("copy_timeout_sec", copy_timeout_sec, min_value=1.0)
logger.info("process_dump_core vm_id=%s pid=%s gdb_timeout_sec=%s", vm_id, pid, gdb_timeout)
conn = self._guest_connection(vm_id)
self._ensure_guest_debug_tools(vm_id, conn, timeout_sec=min(300.0, gdb_timeout), wait_for_cloud_init=True)
self._ensure_guest_pid_running(conn, pid, timeout_sec=min(30.0, gdb_timeout))
timestamp = int(time.time())
guest_core_path = f"/tmp/core.{pid}.{timestamp}"
gdb_cmd = [
"gdb",
"--batch",
"--quiet",
"-p",
str(pid),
"-ex",
"set pagination off",
"-ex",
f"gcore {guest_core_path}",
"-ex",
"detach",
"-ex",
"quit",
]
gdb_result = self._run_gdb_with_optional_sudo(conn, gdb_cmd, timeout_sec=gdb_timeout)
if gdb_result.timed_out:
raise LabError(
code="gcore_timeout",
message="Timed out generating core dump in guest.",
details={"pid": pid, "timeout_sec": gdb_timeout},
)
if gdb_result.exit_code != 0 or self._gdb_attach_failed(gdb_result.stdout, gdb_result.stderr):
raise LabError(
code="gcore_failed",
message="Failed to generate core dump in guest.",
details={"pid": pid, "stderr": gdb_result.stderr, "stdout": gdb_result.stdout},
)
resolved_guest_core_path = self._resolve_guest_core_path(
conn,
requested_path=guest_core_path,
gdb_stdout=gdb_result.stdout,
gdb_stderr=gdb_result.stderr,
timeout_sec=min(30.0, gdb_timeout),
)
target_dir = ensure_dir(self.workspace.vm_artifacts_dir(vm_id) / "core-dumps")
local_path = target_dir / f"{reason_label or 'core'}-{pid}-{timestamp}.core"
scp_copy_out(
conn,
guest_path=resolved_guest_core_path,
host_path=str(local_path),
recursive=False,
timeout=copy_timeout,
)
run_ssh_command(conn, ["sh", "-lc", f"rm -f {shell_join([resolved_guest_core_path])} {shell_join([guest_core_path])}*"], timeout=30.0)
artifact = self._register_artifact(
path=local_path,
vm_id=vm_id,
label=reason_label or "core_dump",
provenance={
"tool": "process_dump_core",
"pid": pid,
"requested_guest_path": guest_core_path,
"resolved_guest_path": resolved_guest_core_path,
},
)
ctx.entry["artifacts"].append(artifact.model_dump())
return {"vm_id": vm_id, "pid": pid, "artifact": artifact.model_dump()}
def guest_dump_memory(
self,
vm_id: str,
output_label: str | None = None,
compress: bool = True,
timeout_sec: int = 1800,
) -> dict[str, Any]:
with self._audit("guest_dump_memory", vm_id=vm_id) as ctx:
dump_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=5.0)
logger.info("guest_dump_memory vm_id=%s timeout_sec=%s", vm_id, dump_timeout)
runtime = self._require_runtime(vm_id)
label = _slug(output_label or "guest-memory")
extension = "dump.gz" if compress else "dump"
output_path = self.workspace.vm_artifacts_dir(vm_id) / f"{label}-{int(time.time())}.{extension}"
qemu_output_path = output_path.resolve().as_posix()
hmp = "dump-guest-memory"
if compress:
hmp += " -z"
hmp += f' "{qemu_output_path}"'
start = time.time()
qmp_timeout = min(60.0, max(5.0, dump_timeout / 3.0))
with self._qmp(runtime, timeout_sec=qmp_timeout) as qmp:
hmp_result = qmp.human_monitor_command(hmp)
logger.debug(
"guest_dump_memory vm_id=%s hmp_command=%s hmp_result=%s",
vm_id,
hmp,
str(hmp_result)[:200],
)
if isinstance(hmp_result, str):
lowered = hmp_result.lower()
if "error" in lowered or "unsupported" in lowered:
raise LabError(
code="guest_memory_dump_failed",
message="QEMU rejected dump-guest-memory command.",
details={"hmp_result": hmp_result, "qemu_output_path": qemu_output_path},
)
wait_timeout = max(1.0, dump_timeout - (time.time() - start))
self._wait_for_file_stable(output_path, timeout_sec=wait_timeout, poll_sec=1.0, stable_polls=2)
artifact = self._register_artifact(
path=output_path,
vm_id=vm_id,
label=label,
provenance={"tool": "guest_dump_memory", "compress": compress},
)
ctx.entry["artifacts"].append(artifact.model_dump())
return {"vm_id": vm_id, "artifact": artifact.model_dump()}
def debugger_attach(self, vm_id: str, pid: int, ready_timeout_sec: int = 600, pid_check_timeout_sec: int = 30) -> dict[str, Any]:
with self._audit("debugger_attach", vm_id=vm_id, pid=pid) as ctx:
ready_timeout = self._require_timeout("ready_timeout_sec", ready_timeout_sec, min_value=1.0)
pid_check_timeout = self._require_timeout("pid_check_timeout_sec", pid_check_timeout_sec, min_value=1.0)
logger.info(
"debugger_attach vm_id=%s pid=%s ready_timeout_sec=%s pid_check_timeout_sec=%s",
vm_id,
pid,
ready_timeout,
pid_check_timeout,
)
conn = self._guest_connection(vm_id)
self._ensure_guest_debug_tools(vm_id, conn, timeout_sec=ready_timeout, wait_for_cloud_init=True)
self._ensure_guest_pid_running(conn, pid, timeout_sec=pid_check_timeout)
session_id = f"dbg-{uuid.uuid4().hex[:10]}"
session = DebuggerSession(session_id=session_id, vm_id=vm_id, pid=pid)
self.debug_sessions[session_id] = session
self.workspace.save_debug_sessions(self.debug_sessions)
ctx.entry["details"] = {"session_id": session_id}
return session.model_dump()
def debugger_set_breakpoint(self, session_id: str, location: str) -> dict[str, Any]:
with self._audit("debugger_set_breakpoint") as ctx:
session = self._require_session(session_id)
require(bool(location.strip()), "invalid_breakpoint", "Breakpoint location cannot be empty.")
if location not in session.breakpoints:
session.breakpoints.append(location)
self.debug_sessions[session_id] = session
self.workspace.save_debug_sessions(self.debug_sessions)
ctx.entry["vm_id"] = session.vm_id
ctx.entry["pid"] = session.pid
return {"session_id": session_id, "breakpoints": session.breakpoints}
def debugger_continue(self, session_id: str, timeout_sec: int = 120) -> dict[str, Any]:
with self._audit("debugger_continue") as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
session = self._require_session(session_id)
conn = self._guest_connection(session.vm_id)
self._ensure_guest_debug_tools(session.vm_id, conn, timeout_sec=min(120.0, command_timeout), wait_for_cloud_init=False)
self._ensure_guest_pid_running(conn, session.pid, timeout_sec=min(20.0, command_timeout))
gdb_cmd = [
"gdb",
"--batch",
"--quiet",
"-p",
str(session.pid),
"-ex",
"set pagination off",
]
for bp in session.breakpoints:
gdb_cmd.extend(["-ex", f"break {bp}"])
gdb_cmd.extend(["-ex", "continue", "-ex", "detach", "-ex", "quit"])
result = self._run_gdb_with_optional_sudo(conn, gdb_cmd, timeout_sec=command_timeout)
if result.timed_out:
raise LabError(
code="debugger_continue_timeout",
message="Timed out while continuing debugger session.",
details={"session_id": session_id, "timeout_sec": command_timeout},
)
if self._gdb_attach_failed(result.stdout, result.stderr):
raise LabError(
code="debugger_continue_failed",
message="Debugger continue failed due to attach restrictions or target state.",
details={"session_id": session_id, "stdout": result.stdout, "stderr": result.stderr},
)
ctx.entry["vm_id"] = session.vm_id
ctx.entry["pid"] = session.pid
return {
"session_id": session_id,
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
}
def debugger_read_registers(self, session_id: str, timeout_sec: int = 60) -> dict[str, Any]:
with self._audit("debugger_read_registers") as ctx:
command_timeout = self._require_timeout("timeout_sec", timeout_sec, min_value=1.0)
session = self._require_session(session_id)
conn = self._guest_connection(session.vm_id)
self._ensure_guest_debug_tools(session.vm_id, conn, timeout_sec=min(120.0, command_timeout), wait_for_cloud_init=False)
self._ensure_guest_pid_running(conn, session.pid, timeout_sec=min(20.0, command_timeout))
gdb_cmd = [
"gdb",
"--batch",
"--quiet",
"-p",
str(session.pid),
"-ex",
"set pagination off",
"-ex",
"info registers",
"-ex",
"detach",
"-ex",
"quit",
]
result = self._run_gdb_with_optional_sudo(conn, gdb_cmd, timeout_sec=command_timeout)
if result.timed_out:
raise LabError(
code="debugger_read_registers_timeout",
message="Timed out reading registers from process with gdb.",
details={"session_id": session_id, "timeout_sec": command_timeout},
)
if result.exit_code != 0 or self._gdb_attach_failed(result.stdout, result.stderr):
raise LabError(
code="debugger_read_registers_failed",
message="Failed to read registers from process with gdb.",
details={"stderr": result.stderr, "stdout": result.stdout},
)
registers: dict[str, str] = {}
for line in result.stdout.splitlines():
line = line.strip()
m = re.match(r"^([A-Za-z0-9_]+)\s+(\S+)\s*(.*)$", line)
if not m:
continue
registers[m.group(1)] = m.group(2)
ctx.entry["vm_id"] = session.vm_id
ctx.entry["pid"] = session.pid
return {"session_id": session_id, "registers": registers, "raw": result.stdout}
def debugger_detach(self, session_id: str) -> dict[str, Any]:
with self._audit("debugger_detach") as ctx:
session = self._require_session(session_id)
session.active = False
self.debug_sessions[session_id] = session
self.workspace.save_debug_sessions(self.debug_sessions)
ctx.entry["vm_id"] = session.vm_id
ctx.entry["pid"] = session.pid
return {"session_id": session_id, "status": "detached"}
def artifacts_list(self, vm_id: str | None = None) -> dict[str, Any]:
with self._audit("artifacts_list", vm_id=vm_id) as ctx:
values = list(self.artifacts.values())
if vm_id:
values = [item for item in values if item.vm_id == vm_id]
values.sort(key=lambda item: item.created_at, reverse=True)
ctx.entry["details"] = {"count": len(values)}
return {"count": len(values), "artifacts": [item.model_dump() for item in values]}
def artifact_by_id(self, artifact_id: str) -> ArtifactMetadata:
artifact = self.artifacts.get(artifact_id)
if not artifact:
raise LabError(code="artifact_not_found", message=f"Artifact not found: {artifact_id}")
return artifact
def _register_artifact(
self,
*,
path: Path,
label: str,
vm_id: str | None,
provenance: dict[str, Any],
mime_type: str = "application/octet-stream",
) -> ArtifactMetadata:
require(path.exists(), "artifact_missing_file", "Artifact file does not exist.", path=str(path))
artifact_id = f"art-{uuid.uuid4().hex[:12]}"
metadata = ArtifactMetadata(
artifact_id=artifact_id,
vm_id=vm_id,
label=label,
path=path_str(path),
size=path.stat().st_size,
sha256=compute_sha256(path),
mime_type=mime_type,
provenance=provenance,
resource_uri=f"artifact://{artifact_id}",
)
self.artifacts[artifact_id] = metadata
self.workspace.save_artifacts(self.artifacts)
return metadata
def _resolve_base_image(self, source: str, timeout_sec: float = 1800.0) -> Path:
parsed = urlparse(source)
if parsed.scheme in {"http", "https"}:
filename = Path(parsed.path).name
require(bool(filename), "invalid_base_image_url", "base_image_source URL must include a file name.")
target_path = self.workspace.base_images_dir / filename
if target_path.exists():
return target_path
tmp_path = target_path.with_suffix(target_path.suffix + ".part")
deadline = time.time() + timeout_sec
with requests.get(source, stream=True, timeout=60) as response:
response.raise_for_status()
with tmp_path.open("wb") as fp:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if time.time() > deadline:
tmp_path.unlink(missing_ok=True)
raise LabError(
code="base_image_download_timeout",
message="Timed out downloading base image.",
details={"url": source, "timeout_sec": timeout_sec},
)
if chunk:
fp.write(chunk)
expected = self._lookup_remote_sha256(source, filename)
if expected:
actual = compute_sha256(tmp_path)
if actual.lower() != expected.lower():
tmp_path.unlink(missing_ok=True)
raise LabError(
code="checksum_mismatch",
message="Base image checksum validation failed.",
details={"expected": expected, "actual": actual, "url": source},
)
tmp_path.replace(target_path)
return target_path
local_path = Path(source)
require(local_path.exists(), "base_image_missing", "base image path does not exist.", path=source)
return local_path
def _lookup_remote_sha256(self, source_url: str, filename: str) -> str | None:
base = source_url.rsplit("/", 1)[0]
sums_url = f"{base}/SHA256SUMS"
try:
response = requests.get(sums_url, timeout=20)
if response.status_code != 200:
return None
for line in response.text.splitlines():
line = line.strip()
if not line or filename not in line:
continue
parts = line.split()
if not parts:
continue
if parts[-1].endswith(filename) or parts[-1].endswith(f"*{filename}"):
return parts[0]
except Exception:
return None
return None
def _create_overlay(self, base_image: Path, overlay_path: Path) -> None:
qemu_img = self.bootstrap.paths.get("qemu-img") or find_executable(["qemu-img.exe", "qemu-img"])
require(bool(qemu_img), "dependency_missing", "qemu-img is not available.")
if overlay_path.exists():
overlay_path.unlink()
run_command(
[qemu_img, "create", "-f", "qcow2", "-F", "qcow2", "-b", str(base_image), str(overlay_path)],
timeout=60.0,
check=True,
)
def _ensure_ssh_keypair(self) -> tuple[Path, Path]:
private_key = self.workspace.keys_dir / "id_ed25519"
public_key = self.workspace.keys_dir / "id_ed25519.pub"
if private_key.exists() and public_key.exists():
return private_key, public_key
ssh_keygen = self.bootstrap.paths.get("ssh-keygen") or find_executable(["ssh-keygen.exe", "ssh-keygen"])
require(bool(ssh_keygen), "dependency_missing", "ssh-keygen is not available.")
run_command(
[ssh_keygen, "-t", "ed25519", "-N", "", "-f", str(private_key), "-C", "mcp-qemu-lab"],
timeout=30.0,
check=True,
)
return private_key, public_key
def _create_cloud_init_seed(self, *, vm_id: str, vm_name: str, seed_iso_path: Path, ssh_pubkey: str) -> None:
logger.info("creating cloud-init seed for vm_id=%s", vm_id)
user_data = f"""#cloud-config
users:
- name: lab
groups: [sudo]
shell: /bin/bash
sudo: "ALL=(ALL) NOPASSWD:ALL"
ssh_authorized_keys:
- {ssh_pubkey}
package_update: true
packages:
- openssh-server
- gdb
- procps
- coreutils
- gdbserver
runcmd:
- [ systemctl, enable, --now, ssh ]
"""
meta_data = f"""instance-id: {vm_id}
local-hostname: {vm_name}
"""
if seed_iso_path.exists():
seed_iso_path.unlink()
iso = pycdlib.PyCdlib()
iso.new(interchange_level=3, vol_ident="cidata", joliet=1, rock_ridge="1.09")
self._iso_add_text_file(iso, "user-data", user_data)
self._iso_add_text_file(iso, "meta-data", meta_data)
iso.write(str(seed_iso_path))
iso.close()
def _iso_add_text_file(self, iso: pycdlib.PyCdlib, name: str, text: str) -> None:
data = text.encode("utf-8")
fp = io.BytesIO(data)
iso_name = f"/{name.upper().replace('-', '_')};1"
iso.add_fp(fp, len(data), iso_path=iso_name, rr_name=name, joliet_path=f"/{name}")
def _qemu_binary_for_arch(self, arch: VmArch) -> str:
if arch != VmArch.X86_64:
raise LabError(code="unsupported_arch", message=f"Unsupported arch: {arch.value}")
qemu = self.bootstrap.paths.get("qemu-system-x86_64") or find_executable(
["qemu-system-x86_64.exe", "qemu-system-x86_64"]
)
require(bool(qemu), "dependency_missing", "qemu-system-x86_64 is not available.")
return qemu
def _cleanup_dead_runtime_entries(self) -> None:
with self.workspace.edit_vm_runtime() as runtime:
dead_ids = [vm_id for vm_id, rt in runtime.items() if not _is_pid_running(rt.pid)]
for vm_id in dead_ids:
runtime.pop(vm_id, None)
self.vm_runtime = dict(runtime)
if dead_ids:
logger.info("removed dead runtime entries: %s", ",".join(dead_ids))
def _refresh_vm_runtime(self) -> None:
self.vm_runtime = self.workspace.load_vm_runtime()
def _upsert_runtime(self, runtime: VmRuntime) -> None:
with self.workspace.edit_vm_runtime() as latest:
latest[runtime.vm_id] = runtime
self.vm_runtime = dict(latest)
def _remove_runtime(self, vm_id: str) -> None:
with self.workspace.edit_vm_runtime() as latest:
latest.pop(vm_id, None)
self.vm_runtime = dict(latest)
def _ensure_known_hosts_file(self) -> None:
known_hosts = self.workspace.keys_dir / "known_hosts"
if not known_hosts.exists():
known_hosts.write_text("", encoding="utf-8")
def _guest_connection(self, vm_id: str) -> GuestConnection:
vm_def = self._require_vm(vm_id)
runtime = self._require_runtime(vm_id)
require(
runtime.ssh_host is not None and runtime.ssh_port is not None,
"ssh_unavailable",
"SSH endpoint is not available for this VM. Create/start with net_mode='user'.",
)
return GuestConnection(
host=runtime.ssh_host,
port=runtime.ssh_port,
user=vm_def.ssh_user,
private_key_path=vm_def.ssh_private_key_path,
known_hosts_path=str(self.workspace.keys_dir / "known_hosts"),
)
def _require_timeout(self, name: str, value: int | float, min_value: float = 0.1) -> float:
try:
as_float = float(value)
except (TypeError, ValueError) as exc:
raise LabError(
code="invalid_timeout",
message=f"{name} must be numeric.",
details={"timeout_name": name, "timeout_sec": value},
) from exc
require(
as_float >= min_value,
"invalid_timeout",
f"{name} must be >= {min_value}.",
timeout_name=name,
timeout_sec=value,
min_value=min_value,
)
return as_float
def _wait_for_guest_ssh_ready(self, conn: GuestConnection, timeout_sec: float, poll_sec: float = 2.0) -> dict[str, Any]:
deadline = time.time() + timeout_sec
attempts = 0
last_stderr = ""
while time.time() < deadline:
attempts += 1
per_attempt_timeout = min(15.0, max(1.0, deadline - time.time()))
result = run_ssh_command(conn, ["/bin/true"], timeout=per_attempt_timeout)
if result.exit_code == 0:
return {"attempts": attempts, "stderr": result.stderr}
if result.stderr:
last_stderr = result.stderr
logger.debug(
"wait_for_guest_ssh_ready attempt=%s exit_code=%s timed_out=%s stderr=%s",
attempts,
result.exit_code,
result.timed_out,
result.stderr[:200],
)
time.sleep(min(poll_sec, max(0.2, deadline - time.time())))
raise LabError(
code="guest_ssh_timeout",
message="Timed out waiting for guest SSH readiness.",
details={"timeout_sec": timeout_sec, "attempts": attempts, "last_stderr": last_stderr},
)
def _wait_for_cloud_init(self, conn: GuestConnection, timeout_sec: float) -> dict[str, Any]:
wait_timeout = self._require_timeout("cloud_init_timeout_sec", timeout_sec, min_value=1.0)
cmd = [
"sh",
"-lc",
"if command -v cloud-init >/dev/null 2>&1; then cloud-init status --wait; else echo cloud-init-not-installed; fi",
]
result = run_ssh_command(conn, cmd, timeout=wait_timeout)
if result.timed_out:
raise LabError(
code="guest_cloud_init_timeout",
message="Timed out waiting for cloud-init finalization.",
details={"timeout_sec": wait_timeout},
)
if result.exit_code != 0:
raise LabError(
code="guest_cloud_init_failed",
message="cloud-init status --wait failed in guest.",
details={"exit_code": result.exit_code, "stderr": result.stderr, "stdout": result.stdout},
)
summary = result.stdout.strip().splitlines()[-1] if result.stdout.strip() else "done"
return {"status": "done", "summary": summary}
def _guest_command_exists(self, conn: GuestConnection, command: str, timeout_sec: float) -> bool:
result = run_ssh_command(conn, ["sh", "-lc", f"command -v {command} >/dev/null 2>&1"], timeout=timeout_sec)
return result.exit_code == 0
def _ensure_guest_command(self, conn: GuestConnection, command: str, timeout_sec: float) -> None:
exists = self._guest_command_exists(conn, command, timeout_sec=min(20.0, max(1.0, timeout_sec)))
require(
exists,
"guest_dependency_missing",
f"Guest dependency '{command}' is not available. Wait for cloud-init or verify guest package installation.",
command=command,
)
def _ensure_guest_debug_tools(
self,
vm_id: str,
conn: GuestConnection,
*,
timeout_sec: float,
wait_for_cloud_init: bool,
) -> None:
total_timeout = self._require_timeout("debug_tools_timeout_sec", timeout_sec, min_value=1.0)
deadline = time.time() + total_timeout
self._wait_for_guest_ssh_ready(conn, timeout_sec=total_timeout, poll_sec=2.0)
if wait_for_cloud_init:
remaining = max(1.0, deadline - time.time())
logger.info("ensuring guest debug tools vm_id=%s waiting for cloud-init remaining=%s", vm_id, remaining)
self._wait_for_cloud_init(conn, timeout_sec=remaining)
remaining = max(1.0, deadline - time.time())
logger.debug("ensuring guest debug tools vm_id=%s checking gdb remaining=%s", vm_id, remaining)
self._ensure_guest_command(conn, "gdb", timeout_sec=remaining)
def _ensure_guest_pid_running(self, conn: GuestConnection, pid: int, timeout_sec: float) -> None:
check_timeout = self._require_timeout("pid_check_timeout_sec", timeout_sec, min_value=1.0)
result = run_ssh_command(conn, ["sh", "-lc", f"kill -0 {int(pid)} >/dev/null 2>&1"], timeout=check_timeout)
require(
result.exit_code == 0,
"process_not_running",
"Target process is not running in guest.",
pid=pid,
stderr=result.stderr,
stdout=result.stdout,
)
def _wait_for_file_stable(
self,
path: Path,
*,
timeout_sec: float,
poll_sec: float = 1.0,
stable_polls: int = 2,
) -> None:
wait_timeout = self._require_timeout("artifact_wait_timeout_sec", timeout_sec, min_value=0.5)
deadline = time.time() + wait_timeout
last_size = -1
stable_count = 0
while time.time() < deadline:
if path.exists():
size = path.stat().st_size
if size > 0 and size == last_size:
stable_count += 1
if stable_count >= stable_polls:
logger.info("artifact file stable path=%s size=%s", path, size)
return
else:
stable_count = 0
last_size = size
logger.debug("waiting for artifact path=%s size=%s stable_count=%s", path, size, stable_count)
time.sleep(min(poll_sec, max(0.1, deadline - time.time())))
raise LabError(
code="artifact_write_timeout",
message="Timed out waiting for artifact file to be written.",
details={"path": str(path), "timeout_sec": wait_timeout},
)
def _resolve_guest_core_path(
self,
conn: GuestConnection,
*,
requested_path: str,
gdb_stdout: str,
gdb_stderr: str,
timeout_sec: float,
) -> str:
resolve_timeout = self._require_timeout("resolve_core_timeout_sec", timeout_sec, min_value=1.0)
combined = "\n".join([gdb_stdout or "", gdb_stderr or ""])
candidates: list[str] = [requested_path]
for raw_line in combined.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"Saved corefile\s+(.+)$", line)
if not match:
continue
candidate = match.group(1).strip().strip("\"'`")
if candidate:
candidates.append(candidate)
seen: set[str] = set()
deduped: list[str] = []
for candidate in candidates:
if candidate in seen:
continue
seen.add(candidate)
deduped.append(candidate)
per_check_timeout = min(10.0, resolve_timeout)
for candidate in deduped:
result = run_ssh_command(conn, ["sh", "-lc", f"test -f {shell_join([candidate])}"], timeout=per_check_timeout)
if result.exit_code == 0:
logger.debug("resolved guest core path requested=%s resolved=%s", requested_path, candidate)
return candidate
glob_result = run_ssh_command(
conn,
["sh", "-lc", f"ls -1 {requested_path}* 2>/dev/null | head -n 1"],
timeout=per_check_timeout,
)
if glob_result.exit_code == 0 and glob_result.stdout.strip():
resolved = glob_result.stdout.strip().splitlines()[0].strip()
logger.debug("resolved guest core path by glob requested=%s resolved=%s", requested_path, resolved)
return resolved
raise LabError(
code="guest_core_not_found",
message="gcore completed but no core file was found in guest.",
details={
"requested_path": requested_path,
"candidates": deduped,
"gdb_stdout": gdb_stdout,
"gdb_stderr": gdb_stderr,
},
)
def _gdb_attach_failed(self, stdout: str, stderr: str) -> bool:
combined = f"{stdout}\n{stderr}".lower()
markers = [
"could not attach to process",
"you can't do that without a process to debug",
"ptrace:",
"operation not permitted",
"inappropriate ioctl for device",
"the program is not being run",
]
return any(marker in combined for marker in markers)
def _run_gdb_with_optional_sudo(self, conn: GuestConnection, gdb_cmd: list[str], *, timeout_sec: float) -> GuestExecResult:
command_timeout = self._require_timeout("gdb_command_timeout_sec", timeout_sec, min_value=1.0)
direct = run_ssh_command(conn, gdb_cmd, timeout=command_timeout)
if direct.timed_out:
return direct
if direct.exit_code == 0 and not self._gdb_attach_failed(direct.stdout, direct.stderr):
return direct
sudo_cmd = ["sudo", "-n", *gdb_cmd]
sudo_result = run_ssh_command(conn, sudo_cmd, timeout=command_timeout)
if not sudo_result.timed_out:
logger.debug(
"gdb sudo fallback used exit_code=%s attach_failed=%s",
sudo_result.exit_code,
self._gdb_attach_failed(sudo_result.stdout, sudo_result.stderr),
)
return sudo_result if (sudo_result.exit_code == 0 and not self._gdb_attach_failed(sudo_result.stdout, sudo_result.stderr)) else direct
def _wait_for_qmp(self, runtime: VmRuntime, timeout_sec: float = 30.0, qmp_timeout_sec: float = 10.0) -> None:
deadline = time.time() + timeout_sec
last_error: Exception | None = None
attempts = 0
while time.time() < deadline:
attempts += 1
try:
with self._qmp(runtime, timeout_sec=qmp_timeout_sec):
logger.debug("qmp ready vm_id=%s attempts=%s", runtime.vm_id, attempts)
return
except Exception as exc:
last_error = exc
logger.debug("qmp wait retry vm_id=%s attempts=%s error=%s", runtime.vm_id, attempts, exc)
time.sleep(0.5)
raise LabError(
code="qmp_timeout",
message="Timed out waiting for QMP endpoint.",
details={
"vm_id": runtime.vm_id,
"endpoint": f"{runtime.qmp_host}:{runtime.qmp_port}",
"attempts": attempts,
"error": str(last_error),
},
)
def _qmp(self, runtime: VmRuntime, timeout_sec: float = 10.0) -> QmpClient:
endpoint = QmpEndpoint(host=runtime.qmp_host, port=runtime.qmp_port)
return QmpClient(endpoint, timeout=timeout_sec)
def _qmp_status(self, runtime: VmRuntime, timeout_sec: float = 10.0) -> str:
try:
with self._qmp(runtime, timeout_sec=timeout_sec) as qmp:
status = qmp.query_status()
return str(status.get("status", "unknown"))
except Exception:
return "unknown"
def _require_vm(self, vm_id: str) -> VmDefinition:
vm = self.vm_definitions.get(vm_id)
if not vm:
raise LabError(code="vm_not_found", message=f"VM not found: {vm_id}")
return vm
def _require_runtime(self, vm_id: str) -> VmRuntime:
self._refresh_vm_runtime()
runtime = self.vm_runtime.get(vm_id)
if not runtime:
raise LabError(code="vm_not_running", message=f"VM is not running: {vm_id}")
if not _is_pid_running(runtime.pid):
self._remove_runtime(vm_id)
raise LabError(code="vm_not_running", message=f"VM process is not running: {vm_id}")
return runtime
def _require_session(self, session_id: str) -> DebuggerSession:
session = self.debug_sessions.get(session_id)
if not session:
raise LabError(code="debug_session_not_found", message=f"Debugger session not found: {session_id}")
if not session.active:
raise LabError(code="debug_session_inactive", message=f"Debugger session is inactive: {session_id}")
return session
@contextmanager
def _audit(
self,
tool: str,
vm_id: str | None = None,
pid: int | None = None,
command: str | None = None,
) -> Iterator[AuditContext]:
entry: dict[str, Any] = {
"timestamp": utc_now_iso(),
"tool": tool,
"vm_id": vm_id,
"pid": pid,
"command": command,
"artifacts": [],
}
try:
yield AuditContext(tool=tool, entry=entry)
entry["status"] = "ok"
except LabError as exc:
entry["status"] = "error"
entry["error"] = exc.to_dict()
raise
except Exception as exc:
entry["status"] = "error"
entry["error"] = {"code": "unexpected_error", "message": str(exc), "details": {}}
raise
finally:
self.workspace.write_audit_entry(entry)
def _format_ssh_endpoint(self, runtime: VmRuntime) -> str | None:
if runtime.ssh_host and runtime.ssh_port:
return f"{runtime.ssh_host}:{runtime.ssh_port}"
return None