We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kevin4562/QEMU-MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from __future__ import annotations
import json
import logging
import socket
import time
from dataclasses import dataclass
from typing import Any
from .util import LabError
logger = logging.getLogger(__name__)
@dataclass
class QmpEndpoint:
host: str
port: int
@property
def uri(self) -> str:
return f"tcp://{self.host}:{self.port}"
class QmpClient:
def __init__(self, endpoint: QmpEndpoint, timeout: float = 10.0):
self.endpoint = endpoint
self.timeout = timeout
self.sock: socket.socket | None = None
self.reader = None
self.writer = None
def __enter__(self) -> "QmpClient":
self.connect()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
def connect(self) -> None:
if self.sock is not None:
return
logger.debug("qmp_connect endpoint=%s timeout=%s", self.endpoint.uri, self.timeout)
sock = socket.create_connection((self.endpoint.host, self.endpoint.port), timeout=self.timeout)
sock.settimeout(self.timeout)
self.sock = sock
self.reader = sock.makefile("r", encoding="utf-8")
self.writer = sock.makefile("w", encoding="utf-8")
hello = self._read_message()
if "QMP" not in hello:
raise LabError(
code="qmp_invalid_hello",
message="QMP endpoint returned invalid handshake.",
details={"payload": hello},
)
self.execute("qmp_capabilities")
def close(self) -> None:
if self.writer:
self.writer.close()
if self.reader:
self.reader.close()
if self.sock:
self.sock.close()
self.sock = None
self.reader = None
self.writer = None
def _read_message(self) -> dict[str, Any]:
if not self.reader:
raise LabError(code="qmp_not_connected", message="QMP client is not connected.")
line = self.reader.readline()
if not line:
raise LabError(code="qmp_eof", message="QMP socket closed unexpectedly.")
try:
return json.loads(line)
except json.JSONDecodeError as exc:
raise LabError(
code="qmp_parse_error",
message="Failed to parse QMP response JSON.",
details={"line": line},
) from exc
def execute(self, command: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]:
if not self.writer:
raise LabError(code="qmp_not_connected", message="QMP client is not connected.")
payload: dict[str, Any] = {"execute": command}
if arguments:
payload["arguments"] = arguments
self.writer.write(json.dumps(payload) + "\n")
self.writer.flush()
deadline = time.monotonic() + max(self.timeout, 0.1)
while time.monotonic() < deadline:
try:
message = self._read_message()
except (TimeoutError, OSError) as exc:
raise LabError(
code="qmp_timeout",
message=f"Timed out waiting for QMP response: {command}",
details={"command": command, "arguments": arguments or {}},
) from exc
if "event" in message:
logger.debug("qmp_event command=%s event=%s", command, message.get("event"))
continue
if "error" in message:
raise LabError(
code="qmp_command_failed",
message=f"QMP command failed: {command}",
details={"error": message["error"], "command": command, "arguments": arguments or {}},
)
returned = message.get("return", {})
logger.debug("qmp_command_ok command=%s returned=%s", command, str(returned)[:200])
return returned
raise LabError(
code="qmp_timeout",
message=f"Timed out waiting for QMP response: {command}",
details={"command": command, "arguments": arguments or {}},
)
def human_monitor_command(self, command_line: str) -> Any:
logger.debug("qmp_hmp command=%s", command_line)
return self.execute("human-monitor-command", {"command-line": command_line})
def query_status(self) -> dict[str, Any]:
return self.execute("query-status")