Skip to main content
Glama
test_transport_framing.py6.83 kB
from transport.legacy.unity_connection import UnityConnection import sys import json import struct import socket import threading import time import select from pathlib import Path import pytest # locate server src dynamically to avoid hardcoded layout assumptions ROOT = Path(__file__).resolve().parents[1] candidates = [ ROOT / "Server", ] SRC = next((p for p in candidates if p.exists()), None) if SRC is None: searched = "\n".join(str(p) for p in candidates) pytest.skip( "MCP for Unity server source not found. Tried:\n" + searched, allow_module_level=True, ) # Tests can now import directly from parent package def start_dummy_server(greeting: bytes, respond_ping: bool = False): """Start a minimal TCP server for handshake tests.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] ready = threading.Event() def _run(): ready.set() conn, _ = sock.accept() conn.settimeout(1.0) if greeting: conn.sendall(greeting) if respond_ping: try: # Read exactly n bytes helper def _read_exact(n: int) -> bytes: buf = b"" while len(buf) < n: chunk = conn.recv(n - len(buf)) if not chunk: break buf += chunk return buf header = _read_exact(8) if len(header) == 8: length = struct.unpack(">Q", header)[0] payload = _read_exact(length) if payload == b'{"type":"ping"}': resp = b'{"type":"pong"}' conn.sendall(struct.pack(">Q", len(resp)) + resp) except Exception: pass time.sleep(0.1) try: conn.close() except Exception: pass finally: sock.close() threading.Thread(target=_run, daemon=True).start() ready.wait() return port def start_handshake_enforcing_server(): """Server that drops connection if client sends data before handshake.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] ready = threading.Event() def _run(): ready.set() conn, _ = sock.accept() # If client sends any data before greeting, disconnect (poll briefly) try: conn.setblocking(False) deadline = time.time() + 0.15 # short, reduces race with legitimate clients while time.time() < deadline: r, _, _ = select.select([conn], [], [], 0.01) if r: try: peek = conn.recv(1, socket.MSG_PEEK) except BlockingIOError: peek = b"" except Exception: peek = b"\x00" if peek: conn.close() sock.close() return # No pre-handshake data observed; send greeting conn.setblocking(True) conn.sendall(b"MCP/0.1 FRAMING=1\n") time.sleep(0.1) finally: try: conn.close() finally: sock.close() threading.Thread(target=_run, daemon=True).start() ready.wait() return port def test_handshake_requires_framing(): port = start_dummy_server(b"MCP/0.1\n") conn = UnityConnection(host="127.0.0.1", port=port) assert conn.connect() is False assert conn.sock is None def test_small_frame_ping_pong(): port = start_dummy_server(b"MCP/0.1 FRAMING=1\n", respond_ping=True) conn = UnityConnection(host="127.0.0.1", port=port) try: assert conn.connect() is True assert conn.use_framing is True payload = b'{"type":"ping"}' conn.sock.sendall(struct.pack(">Q", len(payload)) + payload) resp = conn.receive_full_response(conn.sock) assert json.loads(resp.decode("utf-8"))["type"] == "pong" finally: conn.disconnect() def test_unframed_data_disconnect(): port = start_handshake_enforcing_server() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("127.0.0.1", port)) sock.settimeout(1.0) sock.sendall(b"BAD") time.sleep(0.4) try: data = sock.recv(1024) assert data == b"" except (ConnectionResetError, ConnectionAbortedError): # Some platforms raise instead of returning empty bytes when the # server closes the connection after detecting pre-handshake data. pass finally: sock.close() def test_zero_length_payload_heartbeat(): # Server that sends handshake and a zero-length heartbeat frame followed by a pong payload import socket import struct import threading import time sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] ready = threading.Event() def _run(): ready.set() conn, _ = sock.accept() try: conn.sendall(b"MCP/0.1 FRAMING=1\n") time.sleep(0.02) # Heartbeat frame (length=0) conn.sendall(struct.pack(">Q", 0)) time.sleep(0.02) # Real payload frame payload = b'{"type":"pong"}' conn.sendall(struct.pack(">Q", len(payload)) + payload) time.sleep(0.02) finally: try: conn.close() except Exception: pass sock.close() threading.Thread(target=_run, daemon=True).start() ready.wait() conn = UnityConnection(host="127.0.0.1", port=port) try: assert conn.connect() is True # Receive should skip heartbeat and return the pong payload (or empty if only heartbeats seen) resp = conn.receive_full_response(conn.sock) assert resp in (b'{"type":"pong"}', b"") finally: conn.disconnect() @pytest.mark.skip(reason="TODO: oversized payload should disconnect") def test_oversized_payload_rejected(): pass @pytest.mark.skip(reason="TODO: partial header/payload triggers timeout and disconnect") def test_partial_frame_timeout(): pass @pytest.mark.skip(reason="TODO: concurrency test with parallel tool invocations") def test_parallel_invocations_no_interleaving(): pass @pytest.mark.skip(reason="TODO: reconnection after drop mid-command") def test_reconnect_mid_command(): pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CoplayDev/unity-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server