from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class NetMode(str, Enum):
NONE = "none"
USER = "user"
class VmArch(str, Enum):
X86_64 = "x86_64"
class StopMode(str, Enum):
GRACEFUL = "graceful"
FORCE = "force"
class DependencyStatus(str, Enum):
PRESENT = "present"
INSTALLED = "installed"
MISSING = "missing"
class DependencyRecord(BaseModel):
name: str
status: DependencyStatus
path: str | None = None
version: str | None = None
details: str | None = None
class EnsureDependenciesResult(BaseModel):
workspace: str
manifest_path: str
checked_at: str = Field(default_factory=utc_now_iso)
dependencies: list[DependencyRecord]
class VmDefinition(BaseModel):
vm_id: str
name: str
arch: VmArch = VmArch.X86_64
cpu: int = 2
mem_mb: int = 2048
base_image_source: str
base_image_path: str
overlay_path: str
seed_iso_path: str
net_mode: NetMode = NetMode.NONE
ssh_user: str = "lab"
ssh_private_key_path: str
ssh_public_key_path: str
created_at: str = Field(default_factory=utc_now_iso)
class VmRuntime(BaseModel):
vm_id: str
pid: int
qmp_host: str = "127.0.0.1"
qmp_port: int
ssh_host: str | None = None
ssh_port: int | None = None
started_at: str = Field(default_factory=utc_now_iso)
status: str = "running"
log_path: str
serial_log_path: str | None = None
debug_log_path: str | None = None
class VmStatusResult(BaseModel):
vm_id: str
status: str
qmp_endpoint: str | None = None
ssh_endpoint: str | None = None
pid: int | None = None
class ArtifactMetadata(BaseModel):
artifact_id: str
vm_id: str | None = None
label: str
path: str
size: int
sha256: str
mime_type: str = "application/octet-stream"
created_at: str = Field(default_factory=utc_now_iso)
provenance: dict[str, Any] = Field(default_factory=dict)
resource_uri: str
class GuestExecResult(BaseModel):
vm_id: str
command: str
args: list[str]
stdout: str
stderr: str
exit_code: int
timed_out: bool = False
executed_at: str = Field(default_factory=utc_now_iso)
class ProcessInfo(BaseModel):
pid: int
user: str
cmdline: str
class ProcessMapEntry(BaseModel):
start: str
end: str
perms: str
offset: str
dev: str
inode: int
path: str | None = None
class DebuggerSession(BaseModel):
session_id: str
vm_id: str
pid: int
created_at: str = Field(default_factory=utc_now_iso)
mode: str = "in_guest_stateless"
breakpoints: list[str] = Field(default_factory=list)
active: bool = True
class ToolError(BaseModel):
code: str
message: str
details: dict[str, Any] = Field(default_factory=dict)
def path_str(path: Path) -> str:
return str(path.resolve())