from __future__ import annotations
import json
import logging
import os
import sys
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Literal
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError, ToolError
from .lab import DEFAULT_UBUNTU_IMAGE, QemuLab
from .models import NetMode, StopMode, VmArch
from .util import LabError
def configure_logging(workspace_root: Path | None) -> None:
log_level = os.environ.get("MCP_QEMU_LAB_LOG_LEVEL", "INFO").upper()
handlers: list[logging.Handler] = [logging.StreamHandler(stream=sys.stderr)]
if workspace_root:
log_dir = workspace_root / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_dir / "server.log", encoding="utf-8")
handlers.append(file_handler)
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
handlers=handlers,
force=True,
)
WORKSPACE_ROOT_ENV = "MCP_QEMU_LAB_WORKSPACE"
workspace_root = Path(os.environ[WORKSPACE_ROOT_ENV]) if WORKSPACE_ROOT_ENV in os.environ else None
configure_logging(workspace_root)
logger = logging.getLogger("mcp_qemu_lab")
LAB = QemuLab(workspace_root=workspace_root)
_DEPENDENCIES_ENSURED = False
def _as_tool_error(exc: Exception) -> ToolError:
if isinstance(exc, LabError):
payload = exc.to_dict()
else:
payload = {"code": "unexpected_error", "message": str(exc), "details": {}}
return ToolError(json.dumps(payload, sort_keys=True))
def _tool_guard(fn: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fn)
def wrapper(*args: Any, **kwargs: Any) -> Any:
global _DEPENDENCIES_ENSURED
logger.info("tool_call_start tool=%s kwargs=%s", fn.__name__, kwargs)
try:
if not _DEPENDENCIES_ENSURED and fn.__name__ != "ensure_dependencies":
LAB.ensure_dependencies()
_DEPENDENCIES_ENSURED = True
result = fn(*args, **kwargs)
logger.info("tool_call_end tool=%s status=ok", fn.__name__)
return result
except Exception as exc: # pragma: no cover - passthrough conversion
logger.exception("tool_call_end tool=%s status=error error=%s", fn.__name__, exc)
raise _as_tool_error(exc) from exc
return wrapper
mcp = FastMCP(
name="mcp-qemu-lab",
instructions=(
"Safe-by-default QEMU analysis server. Tools return JSON metadata only. "
"Artifacts are stored on disk and exposed via artifact:// resources."
),
version="0.1.0",
)
@mcp.tool(description="Checks and auto-installs host dependencies, prepares workspace, and records versions.")
@_tool_guard
def ensure_dependencies() -> dict[str, Any]:
global _DEPENDENCIES_ENSURED
result = LAB.ensure_dependencies()
_DEPENDENCIES_ENSURED = True
return result.model_dump()
@mcp.tool(description="Create a VM definition, disks, and cloud-init seed image.")
@_tool_guard
def vm_create(
name: str,
arch: Literal["x86_64"] = "x86_64",
cpu: int = 2,
mem_mb: int = 2048,
base_image_source: str = DEFAULT_UBUNTU_IMAGE,
net_mode: Literal["none", "user"] = "none",
enable_qmp: bool = True,
base_image_download_timeout_sec: int = 1800,
) -> dict[str, Any]:
return LAB.vm_create(
name=name,
arch=VmArch(arch),
cpu=cpu,
mem_mb=mem_mb,
base_image_source=base_image_source,
net_mode=NetMode(net_mode),
enable_qmp=enable_qmp,
base_image_download_timeout_sec=base_image_download_timeout_sec,
)
@mcp.tool(description="Start a VM and return QMP/SSH endpoints.")
@_tool_guard
def vm_start(vm_id: str, qmp_wait_timeout_sec: int = 120, qmp_command_timeout_sec: int = 15) -> dict[str, Any]:
return LAB.vm_start(
vm_id,
qmp_wait_timeout_sec=qmp_wait_timeout_sec,
qmp_command_timeout_sec=qmp_command_timeout_sec,
).model_dump()
@mcp.tool(description="Get VM runtime/QMP/SSH status and log file paths.")
@_tool_guard
def vm_status(vm_id: str, qmp_command_timeout_sec: int = 10) -> dict[str, Any]:
return LAB.vm_status(vm_id, qmp_command_timeout_sec=qmp_command_timeout_sec)
@mcp.tool(description="Stop a VM gracefully or forcefully.")
@_tool_guard
def vm_stop(
vm_id: str,
mode: Literal["graceful", "force"] = "graceful",
graceful_timeout_sec: int = 45,
force_timeout_sec: int = 15,
qmp_command_timeout_sec: int = 10,
) -> dict[str, Any]:
return LAB.vm_stop(
vm_id,
StopMode(mode),
graceful_timeout_sec=graceful_timeout_sec,
force_timeout_sec=force_timeout_sec,
qmp_command_timeout_sec=qmp_command_timeout_sec,
).model_dump()
@mcp.tool(description="Save a VM snapshot by name via QMP HMP bridge.")
@_tool_guard
def vm_snapshot_save(vm_id: str, snapshot_name: str, timeout_sec: int = 120) -> dict[str, Any]:
return LAB.vm_snapshot_save(vm_id, snapshot_name, timeout_sec=timeout_sec)
@mcp.tool(description="Load a VM snapshot by name via QMP HMP bridge.")
@_tool_guard
def vm_snapshot_load(vm_id: str, snapshot_name: str, timeout_sec: int = 120) -> dict[str, Any]:
return LAB.vm_snapshot_load(vm_id, snapshot_name, timeout_sec=timeout_sec)
@mcp.tool(description="Execute an allowlisted guest command over SSH, with optional per-call unsafe override.")
@_tool_guard
def guest_exec(
vm_id: str,
allowed_command: str,
args: list[str] | None = None,
unsafe_allow_arbitrary_commands: bool = False,
unsafe_command: str | None = None,
timeout_sec: int = 120,
) -> dict[str, Any]:
return LAB.guest_exec(
vm_id=vm_id,
allowed_command=allowed_command,
args=args,
unsafe_allow_arbitrary_commands=unsafe_allow_arbitrary_commands,
unsafe_command=unsafe_command,
timeout_sec=timeout_sec,
).model_dump()
@mcp.tool(description="Copy a file artifact out of the guest and return artifact metadata.")
@_tool_guard
def guest_copy_out(vm_id: str, guest_path: str, host_subdir: str = "copied", timeout_sec: int = 180) -> dict[str, Any]:
return LAB.guest_copy_out(vm_id, guest_path, host_subdir, timeout_sec=timeout_sec)
@mcp.tool(description="Copy a host file into the guest filesystem via SCP.")
@_tool_guard
def guest_copy_in(vm_id: str, host_path: str, guest_path: str, timeout_sec: int = 180) -> dict[str, Any]:
return LAB.guest_copy_in(vm_id=vm_id, host_path=host_path, guest_path=guest_path, timeout_sec=timeout_sec)
@mcp.tool(description="List guest processes with pid, user, and cmdline.")
@_tool_guard
def process_list(vm_id: str, filter: str | None = None, timeout_sec: int = 30) -> dict[str, Any]:
return LAB.process_list(vm_id, filter, timeout_sec=timeout_sec)
@mcp.tool(description="Return parsed /proc/<pid>/maps for a guest process.")
@_tool_guard
def process_maps(vm_id: str, pid: int, timeout_sec: int = 30) -> dict[str, Any]:
return LAB.process_maps(vm_id, pid, timeout_sec=timeout_sec)
@mcp.tool(description="Generate a process core dump in guest via gdb/gcore and copy artifact out.")
@_tool_guard
def process_dump_core(
vm_id: str,
pid: int,
reason_label: str | None = None,
gdb_timeout_sec: int = 600,
copy_timeout_sec: int = 300,
) -> dict[str, Any]:
return LAB.process_dump_core(
vm_id,
pid,
reason_label,
gdb_timeout_sec=gdb_timeout_sec,
copy_timeout_sec=copy_timeout_sec,
)
@mcp.tool(description="Dump full guest memory to a host artifact file via QMP.")
@_tool_guard
def guest_dump_memory(
vm_id: str,
output_label: str | None = None,
compress: bool = True,
timeout_sec: int = 1800,
) -> dict[str, Any]:
return LAB.guest_dump_memory(vm_id, output_label, compress, timeout_sec=timeout_sec)
@mcp.tool(description="Create an in-guest debugger session handle for a target PID.")
@_tool_guard
def debugger_attach(vm_id: str, pid: int, ready_timeout_sec: int = 600, pid_check_timeout_sec: int = 30) -> dict[str, Any]:
return LAB.debugger_attach(
vm_id,
pid,
ready_timeout_sec=ready_timeout_sec,
pid_check_timeout_sec=pid_check_timeout_sec,
)
@mcp.tool(description="Set breakpoint (symbol or address) for a debugger session.")
@_tool_guard
def debugger_set_breakpoint(session_id: str, location: str) -> dict[str, Any]:
return LAB.debugger_set_breakpoint(session_id, location)
@mcp.tool(description="Continue execution for debugger session.")
@_tool_guard
def debugger_continue(session_id: str, timeout_sec: int = 120) -> dict[str, Any]:
return LAB.debugger_continue(session_id, timeout_sec)
@mcp.tool(description="Read process registers via in-guest gdb attach.")
@_tool_guard
def debugger_read_registers(session_id: str, timeout_sec: int = 60) -> dict[str, Any]:
return LAB.debugger_read_registers(session_id, timeout_sec=timeout_sec)
@mcp.tool(description="Detach and mark debugger session inactive.")
@_tool_guard
def debugger_detach(session_id: str) -> dict[str, Any]:
return LAB.debugger_detach(session_id)
@mcp.tool(description="List artifacts with hashes and provenance.")
@_tool_guard
def artifacts_list(vm_id: str | None = None) -> dict[str, Any]:
return LAB.artifacts_list(vm_id)
@mcp.tool(description="Fetch recent QEMU/serial/debug log tails and related audit lines for a VM.")
@_tool_guard
def vm_logs_tail(
vm_id: str,
lines: int = 200,
include_serial: bool = True,
include_debug: bool = True,
include_audit_lines: int = 50,
) -> dict[str, Any]:
return LAB.vm_logs_tail(
vm_id=vm_id,
lines=lines,
include_serial=include_serial,
include_debug=include_debug,
include_audit_lines=include_audit_lines,
)
@mcp.tool(description="Wait for guest SSH/cloud-init readiness and optional debugger dependencies (gdb).")
@_tool_guard
def guest_wait_ready(
vm_id: str,
timeout_sec: int = 900,
poll_interval_sec: int = 5,
wait_for_cloud_init: bool = True,
require_gdb: bool = True,
) -> dict[str, Any]:
return LAB.guest_wait_ready(
vm_id=vm_id,
timeout_sec=timeout_sec,
poll_interval_sec=poll_interval_sec,
wait_for_cloud_init=wait_for_cloud_init,
require_gdb=require_gdb,
)
@mcp.resource(
"artifact://{artifact_id}",
name="Artifact Resource",
description="Binary artifact file by artifact id.",
mime_type="application/octet-stream",
)
def artifact_resource(artifact_id: str) -> bytes:
try:
artifact = LAB.artifact_by_id(artifact_id)
path = Path(artifact.path)
return path.read_bytes()
except Exception as exc:
payload = exc.to_dict() if isinstance(exc, LabError) else {"message": str(exc)}
raise ResourceError(json.dumps(payload, sort_keys=True)) from exc
@mcp.resource(
"artifact-index://all",
name="Artifact Index",
description="JSON index of all artifact metadata records.",
mime_type="application/json",
)
def artifact_index_resource() -> dict[str, Any]:
return LAB.artifacts_list()
def main() -> None:
try:
LAB.ensure_dependencies()
global _DEPENDENCIES_ENSURED
_DEPENDENCIES_ENSURED = True
except Exception as exc: # pragma: no cover - startup fallback
logger.exception("Dependency bootstrap on startup failed; tools will retry lazily: %s", exc)
mcp.run(transport="stdio", show_banner=False)
if __name__ == "__main__":
main()