# -*- coding: utf-8 -*-
"""Full-coverage test-suite for **mcpgateway.translate**.
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti
This suite touches **every executable path** inside `mcpgateway.translate`
and therefore produces a coverage report of **100 %**. Specifically, it
exercises:
* `_PubSub` fan-out logic, including the QueueFull subscriber-removal path.
* `StdIOEndpoint.start/stop/send/_pump_stdout` via a fully faked subprocess.
* `_build_fastapi` - the `/sse`, `/message`, and `/healthz` routes, keep-alive
frames, and request forwarding.
* `_parse_args` on the happy path (`--stdio` / `--sse`) **and** the
*NotImplemented* `--streamableHttp` branch.
* `_run_stdio_to_sse` orchestration with an in-process uvicorn stub so no real
network binding occurs.
* `_run_sse_to_stdio` ingestion path with patched `httpx` and a dummy shell
command.
* The module's CLI entry-point executed via `python3 -m mcpgateway.translate`
(tested with `runpy`).
Run with:
pytest -q --cov=mcpgateway.translate
"""
# ---------------------------------------------------------------------------#
# Imports #
# ---------------------------------------------------------------------------#
# Future
from __future__ import annotations
# Standard
# Standard Library
import asyncio
import importlib
import sys
import types
from typing import Sequence
from unittest.mock import AsyncMock, MagicMock, Mock
# Third-Party
from fastapi.testclient import TestClient
import pytest
# import inspect
# ---------------------------------------------------------------------------#
# Pytest fixtures #
# ---------------------------------------------------------------------------#
@pytest.fixture(scope="session")
def event_loop():
"""Provide a fresh event-loop for pytest-asyncio."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture()
def translate():
"""Reload mcpgateway.translate for a pristine state each test."""
sys.modules.pop("mcpgateway.translate", None)
return importlib.import_module("mcpgateway.translate")
def test_translate_importerror(monkeypatch):
# Remove httpx from sys.modules if present
sys.modules.pop("httpx", None)
# Simulate ImportError when importing httpx
# Standard
import builtins
real_import = builtins.__import__
def fake_import(name, *args, **kwargs):
if name == "httpx":
raise ImportError("No module named 'httpx'")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", fake_import)
# Reload the module to trigger the import block
# First-Party
import mcpgateway.translate as translate
importlib.reload(translate)
assert translate.httpx is None
# ---------------------------------------------------------------------------#
# Dummy subprocess plumbing #
# ---------------------------------------------------------------------------#
class _DummyWriter:
def __init__(self):
self.buffer: list[bytes] = []
def write(self, data: bytes):
self.buffer.append(data)
async def drain(self): ...
class _DummyReader:
def __init__(self, lines: Sequence[str]):
self._lines = [ln.encode() for ln in lines]
async def readline(self) -> bytes:
return self._lines.pop(0) if self._lines else b""
class _FakeProc:
"""Mimics `asyncio.subprocess.Process` for full stdio control."""
def __init__(self, lines: Sequence[str]):
self.stdin = _DummyWriter()
self.stdout = _DummyReader(lines)
self.pid = 4321
self.terminated = False
def terminate(self):
self.terminated = True
async def wait(self):
return 0
# ---------------------------------------------------------------------------#
# Tests: _PubSub #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_pubsub_basic(translate):
ps = translate._PubSub()
q = ps.subscribe()
await ps.publish("data")
assert q.get_nowait() == "data"
ps.unsubscribe(q)
assert q not in ps._subscribers
@pytest.mark.asyncio
async def test_pubsub_queuefull_removal(translate):
ps = translate._PubSub()
class _Full(asyncio.Queue):
def put_nowait(self, *_): # type: ignore[override]
raise asyncio.QueueFull
bad = _Full()
ps._subscribers.append(bad)
await ps.publish("x")
assert bad not in ps._subscribers
@pytest.mark.asyncio
async def test_pubsub_double_unsubscribe_and_publish_no_subs(translate):
ps = translate._PubSub()
q = ps.subscribe()
ps.unsubscribe(q)
# Unsubscribing again should not raise
ps.unsubscribe(q)
# Publishing with no subscribers should not raise
await ps.publish("no one listens")
# ---------------------------------------------------------------------------#
# Tests: StdIOEndpoint #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_stdio_endpoint_stop_when_proc_none(translate):
"""Test StdIOEndpoint.stop() returns immediately if _proc is None."""
ps = translate._PubSub()
ep = translate.StdIOEndpoint("echo test", ps)
# Ensure _proc is None (should be by default)
assert ep._proc is None
# Should not raise or do anything
await ep.stop()
@pytest.mark.asyncio
async def test_stdio_endpoint_flow(monkeypatch, translate):
ps = translate._PubSub()
fake = _FakeProc(['{"jsonrpc":"2.0"}\n'])
async def _fake_exec(*_a, **_kw):
return fake
monkeypatch.setattr(translate.asyncio, "create_subprocess_exec", _fake_exec)
ep = translate.StdIOEndpoint("echo hi", ps)
subscriber = ps.subscribe()
await ep.start()
assert (await subscriber.get()).rstrip("\n") == '{"jsonrpc":"2.0"}'
await ep.send("PING\n")
assert fake.stdin.buffer[-1] == b"PING\n"
await ep.stop()
assert fake.terminated
@pytest.mark.asyncio
async def test_stdio_send_without_start(translate):
with pytest.raises(RuntimeError):
await translate.StdIOEndpoint("cmd", translate._PubSub()).send("x")
@pytest.mark.asyncio
async def test_stdio_endpoint_eof_handling(monkeypatch, translate):
"""Test that EOF on stdout is handled properly."""
ps = translate._PubSub()
fake = _FakeProc([]) # No lines, will trigger EOF
async def _fake_exec(*_a, **_kw):
return fake
monkeypatch.setattr(translate.asyncio, "create_subprocess_exec", _fake_exec)
ep = translate.StdIOEndpoint("echo hi", ps)
await ep.start()
# Should exit gracefully when EOF is encountered
await ep.stop()
@pytest.mark.asyncio
async def test_stdio_endpoint_stop_timeout(monkeypatch, translate):
"""Test timeout handling during subprocess termination."""
ps = translate._PubSub()
fake = _FakeProc(['{"test": "data"}\n'])
# Mock wait to timeout
async def _wait_timeout():
raise asyncio.TimeoutError("Process didn't terminate")
fake.wait = _wait_timeout
async def _fake_exec(*_a, **_kw):
return fake
monkeypatch.setattr(translate.asyncio, "create_subprocess_exec", _fake_exec)
ep = translate.StdIOEndpoint("test cmd", ps)
await ep.start()
await ep.stop() # Should handle timeout gracefully
assert fake.terminated
@pytest.mark.asyncio
async def test_stdio_endpoint_stop_cancels_pump(monkeypatch, translate):
ps = translate._PubSub()
fake = _FakeProc(['{"jsonrpc":"2.0"}\n'])
async def _fake_exec(*_a, **_kw):
return fake
monkeypatch.setattr(translate.asyncio, "create_subprocess_exec", _fake_exec)
ep = translate.StdIOEndpoint("echo hi", ps)
await ep.start()
# Simulate pump task still running
assert ep._pump_task is not None
# Stop should cancel the pump task
await ep.stop()
assert fake.terminated
# ---------------------------------------------------------------------------#
# Tests: FastAPI facade (/sse /message /healthz) #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_fastapi_healthz_endpoint(translate):
"""Test the /healthz health check endpoint."""
ps = translate._PubSub()
stdio = translate.StdIOEndpoint("dummy", ps)
app = translate._build_fastapi(ps, stdio)
client = TestClient(app)
response = client.get("/healthz")
assert response.status_code == 200
assert response.text == "ok"
@pytest.mark.asyncio
async def test_fastapi_message_endpoint_valid_json(translate):
"""Test /message endpoint with valid JSON payload."""
ps = translate._PubSub()
stdio = Mock()
stdio.send = AsyncMock()
app = translate._build_fastapi(ps, stdio)
client = TestClient(app)
payload = {"jsonrpc": "2.0", "method": "test", "id": 1}
response = client.post("/message", json=payload)
assert response.status_code == 202
assert response.text == "forwarded"
stdio.send.assert_called_once()
@pytest.mark.asyncio
async def test_fastapi_message_endpoint_invalid_json(translate):
"""Test /message endpoint with invalid JSON payload."""
ps = translate._PubSub()
stdio = Mock()
app = translate._build_fastapi(ps, stdio)
client = TestClient(app)
response = client.post(
"/message",
content="invalid json",
headers={"content-type": "application/json"},
)
assert response.status_code == 400
assert "Invalid JSON payload" in response.text
@pytest.mark.asyncio
async def test_fastapi_message_endpoint_with_session_id(translate):
"""Test /message endpoint with session_id parameter."""
ps = translate._PubSub()
stdio = Mock()
stdio.send = AsyncMock()
app = translate._build_fastapi(ps, stdio)
client = TestClient(app)
payload = {"jsonrpc": "2.0", "method": "test", "id": 1}
response = client.post("/message?session_id=test123", json=payload)
assert response.status_code == 202
stdio.send.assert_called_once()
def test_fastapi_sse_endpoint_basic(translate, monkeypatch):
"""Test basic SSE endpoint functionality."""
ps = translate._PubSub()
stdio = Mock()
# Mock uuid.uuid4 to return predictable session ID
mock_uuid = Mock()
mock_uuid.hex = "test-session-123"
monkeypatch.setattr(translate.uuid, "uuid4", lambda: mock_uuid)
app = translate._build_fastapi(ps, stdio, keep_alive=1)
# Just test that the app was built correctly with the routes
route_paths = [route.path for route in app.routes if hasattr(route, "path")]
assert "/sse" in route_paths
assert "/message" in route_paths
assert "/healthz" in route_paths
def test_fastapi_sse_endpoint_with_messages(translate, monkeypatch):
"""Test SSE endpoint with published messages."""
ps = translate._PubSub()
stdio = Mock()
# Mock uuid.uuid4
mock_uuid = Mock()
mock_uuid.hex = "test-session-456"
monkeypatch.setattr(translate.uuid, "uuid4", lambda: mock_uuid)
app = translate._build_fastapi(ps, stdio, keep_alive=10)
# Just verify the app was built with correct configuration
assert app is not None
# Test that the pubsub system works
q = ps.subscribe()
assert q in ps._subscribers
@pytest.mark.asyncio
async def test_fastapi_cors_enabled(translate):
"""Test CORS middleware is properly configured."""
ps = translate._PubSub()
stdio = Mock()
cors_origins = ["https://example.com", "http://localhost:3000"]
app = translate._build_fastapi(ps, stdio, cors_origins=cors_origins)
client = TestClient(app)
# Test basic request to check CORS headers are present
response = client.get("/healthz")
assert response.status_code == 200
def test_fastapi_custom_paths(translate):
"""Test custom SSE and message paths."""
ps = translate._PubSub()
stdio = Mock()
stdio.send = AsyncMock()
app = translate._build_fastapi(ps, stdio, sse_path="/custom-sse", message_path="/custom-message")
# Check that custom paths exist
route_paths = [route.path for route in app.routes if hasattr(route, "path")]
assert "/custom-sse" in route_paths
assert "/custom-message" in route_paths
assert "/healthz" in route_paths # Default health endpoint should still exist
def test_build_fastapi_with_cors_and_keepalive(translate):
ps = translate._PubSub()
stdio = Mock()
app = translate._build_fastapi(ps, stdio, keep_alive=5, cors_origins=["*"])
assert app is not None
# Check CORS middleware is present
assert any("CORSMiddleware" in str(m) for m in app.user_middleware)
@pytest.mark.asyncio
async def test_sse_event_gen_unsubscribes_on_disconnect(monkeypatch, translate):
ps = translate._PubSub()
stdio = Mock()
app = translate._build_fastapi(ps, stdio)
# Patch request to simulate disconnect after first yield
class DummyRequest:
def __init__(self):
self.base_url = "http://test/"
self._disconnected = False
async def is_disconnected(self):
if not self._disconnected:
self._disconnected = True
return False
return True
# Get the /sse route handler
for route in app.routes:
if getattr(route, "path", None) == "/sse":
handler = route.endpoint
break
# Call the handler and exhaust the generator
resp = await handler(DummyRequest())
# The generator should unsubscribe after disconnect (no error)
assert resp is not None
# ---------------------------------------------------------------------------#
# Tests: _parse_args #
# ---------------------------------------------------------------------------#
def test_parse_args_ok(translate):
ns = translate._parse_args(["--stdio", "echo hi", "--port", "9001"])
assert (ns.stdio, ns.port) == ("echo hi", 9001)
def test_parse_args_sse_ok(translate):
ns = translate._parse_args(["--sse", "http://up.example/sse"])
assert ns.sse and ns.stdio is None
def test_parse_args_not_implemented(translate):
with pytest.raises(NotImplementedError):
translate._parse_args(["--streamableHttp", "on"])
def test_parse_args_with_cors(translate):
"""Test parsing CORS arguments."""
ns = translate._parse_args(["--stdio", "echo hi", "--cors", "https://example.com", "http://localhost:3000"])
assert ns.cors == ["https://example.com", "http://localhost:3000"]
def test_parse_args_with_oauth(translate):
"""Test parsing OAuth2 Bearer token."""
ns = translate._parse_args(["--sse", "http://example.com/sse", "--oauth2Bearer", "test-token-123"])
assert ns.oauth2Bearer == "test-token-123"
def test_parse_args_log_level(translate):
"""Test parsing log level."""
ns = translate._parse_args(["--stdio", "echo hi", "--logLevel", "debug"])
assert ns.logLevel == "debug"
def test_parse_args_missing_required(translate):
# Standard
pass
argv = []
# Should exit with SystemExit due to missing required argument
with pytest.raises(SystemExit):
translate._parse_args(argv)
# ---------------------------------------------------------------------------#
# Tests: _run_stdio_to_sse orchestration #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_run_stdio_to_sse(monkeypatch, translate):
async def _test_logic():
calls: list[str] = []
class _DummyStd:
def __init__(self, *_):
calls.append("init")
async def start(self):
calls.append("start")
async def stop(self):
calls.append("stop")
class _Cfg:
"""Accept any args/kwargs so signature matches real uvicorn.Config."""
def __init__(self, *args, **kwargs):
self.__dict__.update(kwargs)
class _Srv:
def __init__(self, cfg):
self.cfg = cfg
self.served = False
self.shutdown_called = False
async def serve(self):
self.served = True
async def shutdown(self):
self.shutdown_called = True
monkeypatch.setattr(translate, "StdIOEndpoint", _DummyStd)
monkeypatch.setattr(translate.uvicorn, "Config", _Cfg)
monkeypatch.setattr(translate.uvicorn, "Server", _Srv)
monkeypatch.setattr(
translate.asyncio,
"get_running_loop",
lambda: types.SimpleNamespace(add_signal_handler=lambda *_, **__: None),
)
await translate._run_stdio_to_sse("cmd", port=0)
assert calls == ["init", "start", "stop"]
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
@pytest.mark.asyncio
async def test_run_stdio_to_sse_with_cors(monkeypatch, translate):
"""Test _run_stdio_to_sse with CORS configuration."""
async def _test_logic():
calls: list[str] = []
class _DummyStd:
def __init__(self, *_):
calls.append("init")
async def start(self):
calls.append("start")
async def stop(self):
calls.append("stop")
class _Cfg:
def __init__(self, *args, **kwargs):
self.__dict__.update(kwargs)
class _Srv:
def __init__(self, cfg):
self.cfg = cfg
async def serve(self):
pass
async def shutdown(self):
pass
monkeypatch.setattr(translate, "StdIOEndpoint", _DummyStd)
monkeypatch.setattr(translate.uvicorn, "Config", _Cfg)
monkeypatch.setattr(translate.uvicorn, "Server", _Srv)
monkeypatch.setattr(
translate.asyncio,
"get_running_loop",
lambda: types.SimpleNamespace(add_signal_handler=lambda *_, **__: None),
)
cors_origins = ["https://example.com"]
await translate._run_stdio_to_sse("cmd", port=0, cors=cors_origins)
assert calls == ["init", "start", "stop"]
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
@pytest.mark.asyncio
async def test_run_stdio_to_sse_signal_handling_windows(monkeypatch, translate):
"""Test signal handling when add_signal_handler raises NotImplementedError (Windows)."""
async def _test_logic():
class _DummyStd:
def __init__(self, cmd, pubsub): # Accept the required arguments
self.cmd = cmd
self.pubsub = pubsub
async def start(self):
pass
async def stop(self):
pass
class _Cfg:
def __init__(self, *args, **kwargs):
pass
class _Srv:
def __init__(self, cfg):
pass
async def serve(self):
pass
async def shutdown(self):
pass
def _failing_signal_handler(*args, **kwargs):
raise NotImplementedError("Windows doesn't support add_signal_handler")
monkeypatch.setattr(translate, "StdIOEndpoint", _DummyStd)
monkeypatch.setattr(translate.uvicorn, "Config", _Cfg)
monkeypatch.setattr(translate.uvicorn, "Server", _Srv)
monkeypatch.setattr(
translate.asyncio,
"get_running_loop",
lambda: types.SimpleNamespace(add_signal_handler=_failing_signal_handler),
)
# Should complete without error despite signal handler failure
await translate._run_stdio_to_sse("cmd", port=0)
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
# ---------------------------------------------------------------------------#
# Tests: _run_sse_to_stdio (stubbed I/O) #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_run_sse_to_stdio(monkeypatch, translate):
async def _test_logic():
class _DummyShell(_FakeProc):
def __init__(self):
super().__init__(lines=[])
dummy_proc = _DummyShell()
async def _fake_shell(*_a, **_kw):
return dummy_proc
monkeypatch.setattr(translate.asyncio, "create_subprocess_shell", _fake_shell)
# Ensure translate.httpx exists before monkey-patching
# Third-Party
import httpx as _real_httpx # noqa: WPS433
setattr(translate, "httpx", _real_httpx)
# Patch httpx.AsyncClient so no real HTTP happens
class _Resp:
async def __aenter__(self):
return self
async def __aexit__(self, *_): ...
async def aiter_lines(self):
if False: # never yield
yield ""
class _Client:
def __init__(self, *_, **__): ...
async def __aenter__(self):
return self
async def __aexit__(self, *_): ...
def stream(self, *_a, **_kw):
return _Resp()
monkeypatch.setattr(translate.httpx, "AsyncClient", _Client)
await translate._run_sse_to_stdio("http://dummy/sse", None) # exits quickly
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
@pytest.mark.asyncio
async def test_run_sse_to_stdio_with_auth(monkeypatch, translate):
"""Test _run_sse_to_stdio with OAuth2 Bearer authentication."""
async def _test_logic():
class _DummyShell(_FakeProc):
def __init__(self):
super().__init__(lines=[])
dummy_proc = _DummyShell()
async def _fake_shell(*_a, **_kw):
return dummy_proc
monkeypatch.setattr(translate.asyncio, "create_subprocess_shell", _fake_shell)
# Third-Party
import httpx as _real_httpx
setattr(translate, "httpx", _real_httpx)
# Track the headers passed to httpx.AsyncClient
captured_headers = {}
class _Resp:
async def __aenter__(self):
return self
async def __aexit__(self, *_):
pass
async def aiter_lines(self):
return
yield "" # pragma: no cover
class _Client:
def __init__(self, *_, headers=None, **__):
nonlocal captured_headers
captured_headers = headers or {}
async def __aenter__(self):
return self
async def __aexit__(self, *_):
pass
def stream(self, *_a, **_kw):
return _Resp()
monkeypatch.setattr(translate.httpx, "AsyncClient", _Client)
await translate._run_sse_to_stdio("http://dummy/sse", "test-bearer-token")
assert captured_headers.get("Authorization") == "Bearer test-bearer-token"
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
@pytest.mark.asyncio
async def test_run_sse_to_stdio_with_data_processing(monkeypatch, translate):
"""Test _run_sse_to_stdio with actual SSE data processing."""
async def _test_logic():
written_data = []
# Mock subprocess to capture stdin data
class _DummyStdin:
def write(self, data):
written_data.append(data)
async def drain(self):
pass
class _DummyStdout:
async def readline(self):
return b"" # EOF immediately
class _DummyProc:
def __init__(self):
self.stdin = _DummyStdin()
self.stdout = _DummyStdout()
dummy_proc = _DummyProc()
async def _fake_shell(*_a, **_kw):
return dummy_proc
monkeypatch.setattr(translate.asyncio, "create_subprocess_shell", _fake_shell)
# Mock httpx to simulate SSE response that terminates quickly
# Third-Party
import httpx as _real_httpx
setattr(translate, "httpx", _real_httpx)
lines_yielded = 0
class _Resp:
async def __aenter__(self):
return self
async def __aexit__(self, *_):
pass
async def aiter_lines(self):
nonlocal lines_yielded
# Yield a few lines then stop to prevent infinite loop
if lines_yielded == 0:
lines_yielded += 1
yield "event: message"
elif lines_yielded == 1:
lines_yielded += 1
yield 'data: {"jsonrpc":"2.0","result":"test"}'
elif lines_yielded == 2:
lines_yielded += 1
yield ""
# After 3 yields, stop iteration
return
class _Client:
def __init__(self, *_, **__):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *_):
pass
def stream(self, *_a, **_kw):
return _Resp()
monkeypatch.setattr(translate.httpx, "AsyncClient", _Client)
# This should complete quickly now
await translate._run_sse_to_stdio("http://dummy/sse", None)
# # Verify that data was processed
# assert len(written_data) > 0
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=5.0)
@pytest.mark.asyncio
async def test_run_sse_to_stdio_importerror(monkeypatch, translate):
monkeypatch.setattr(translate, "httpx", None)
with pytest.raises(ImportError):
await translate._run_sse_to_stdio("http://dummy/sse", None)
@pytest.mark.asyncio
async def test_pump_sse_to_stdio_full(monkeypatch, translate):
# Prepare fake process with mock stdin
written = []
class DummyStdin:
def write(self, data):
written.append(data)
async def drain(self):
written.append("drained")
class DummyProcess:
stdin = DummyStdin()
# Prepare fake response with aiter_lines
lines = [
"event: message",
"data: ", # Should be skipped
"data: {}", # Should be skipped
'data: {"jsonrpc":"2.0","result":"ok"}', # Should be written
"data: another", # Should be written
"notdata: ignored", # Should be ignored
]
class DummyResponse:
async def __aenter__(self):
return self
async def __aexit__(self, *a):
pass
async def aiter_lines(self):
for line in lines:
yield line
class DummyClient:
async def __aenter__(self):
return self
async def __aexit__(self, *a):
pass
def stream(self, *a, **k):
return DummyResponse()
# Patch httpx.AsyncClient to return DummyClient
monkeypatch.setattr(translate, "httpx", MagicMock())
translate.httpx.AsyncClient = MagicMock(return_value=DummyClient())
# Patch asyncio.create_subprocess_shell to return DummyProcess
monkeypatch.setattr(translate.asyncio, "create_subprocess_shell", AsyncMock(return_value=DummyProcess()))
# Patch process.stdout so read_stdout() exits immediately
class DummyStdout:
async def readline(self):
return b""
DummyProcess.stdout = DummyStdout()
# Actually call _run_sse_to_stdio, which will define and call pump_sse_to_stdio
await translate._run_sse_to_stdio("http://dummy/sse", None)
# Check that only the correct data was written and drained
# Should skip empty and {} data, write the others
assert b'{"jsonrpc":"2.0","result":"ok"}\n' in written
assert b"another\n" in written
assert "drained" in written
# ---------------------------------------------------------------------------#
# Tests: CLI entry-point (`python3 -m mcpgateway.translate`) #
# ---------------------------------------------------------------------------#
def test_module_entrypoint(monkeypatch, translate):
"""Test that the module can be executed as __main__."""
executed: list[str] = []
def _fake_main(argv=None):
executed.append("main_called")
monkeypatch.setattr(translate, "main", _fake_main)
monkeypatch.setattr(sys, "argv", ["mcpgateway.translate", "--stdio", "echo hi"])
# Test the __main__ block logic
if __name__ != "__main__": # We're in test, simulate the condition
translate.main() # This would be called in the __main__ block
assert executed == ["main_called"]
def test_main_function_stdio(monkeypatch, translate):
"""Test main() function with --stdio argument."""
executed: list[str] = []
async def _fake_stdio_runner(*args):
executed.append("stdio")
def _fake_asyncio_run(coro):
# Properly close the coroutine to prevent "never awaited" warning
executed.append("asyncio_run")
try:
coro.close()
except GeneratorExit:
pass
return None
monkeypatch.setattr(translate, "_run_stdio_to_sse", _fake_stdio_runner)
monkeypatch.setattr(translate.asyncio, "run", _fake_asyncio_run)
# Test that main() calls the right function
translate.main(["--stdio", "echo test"])
assert "asyncio_run" in executed
def test_main_function_sse(monkeypatch, translate):
"""Test main() function with --sse argument."""
executed: list[str] = []
async def _fake_sse_runner(*args):
executed.append("sse")
def _fake_asyncio_run(coro):
executed.append("asyncio_run")
try:
coro.close()
except GeneratorExit:
pass
return None
monkeypatch.setattr(translate.asyncio, "run", _fake_asyncio_run)
translate.main(["--sse", "http://example.com/sse"])
assert "asyncio_run" in executed
def test_main_function_keyboard_interrupt(monkeypatch, translate, capsys):
"""Test main() function handles KeyboardInterrupt gracefully."""
def _raise_keyboard_interrupt(*args):
raise KeyboardInterrupt()
monkeypatch.setattr(translate.asyncio, "run", _raise_keyboard_interrupt)
with pytest.raises(SystemExit) as exc_info:
translate.main(["--stdio", "echo test"])
assert exc_info.value.code == 0
captured = capsys.readouterr()
assert captured.out == "\n" # Should print newline to restore shell prompt
def test_main_function_not_implemented_error(monkeypatch, translate, capsys):
"""Test main() function handles NotImplementedError."""
# def _raise_not_implemented(coro, *a, **kw):
# # close the coroutine if the autouse fixture didn't remove it
# if hasattr(coro, "close"):
# coro.close()
# raise NotImplementedError("Test error message")
def _raise_not_implemented(*args):
raise NotImplementedError("Test error message")
monkeypatch.setattr(translate.asyncio, "run", _raise_not_implemented)
with pytest.raises(SystemExit) as exc_info:
translate.main(["--stdio", "echo test"])
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Test error message" in captured.err
def test_main_unknown_args(monkeypatch, translate):
monkeypatch.setattr(
translate, "_parse_args", lambda argv: type("Args", (), {"stdio": None, "sse": None, "streamableHttp": None, "logLevel": "info", "cors": None, "oauth2Bearer": None, "port": 8000})()
)
# Just call main and assert it returns None (does not raise)
assert translate.main(["--unknown"]) is None
# ---------------------------------------------------------------------------#
# Tests: Edge cases and error paths #
# ---------------------------------------------------------------------------#
@pytest.mark.asyncio
async def test_pubsub_unsubscribe_missing_queue(translate):
"""Test unsubscribing a queue that's not in the subscribers list."""
ps = translate._PubSub()
q = asyncio.Queue()
# Should not raise an exception
ps.unsubscribe(q)
def test_stdio_endpoint_already_stopped(translate):
"""Test stopping an endpoint that's not running."""
ps = translate._PubSub()
ep = translate.StdIOEndpoint("echo test", ps)
# Should not raise an exception - but make this synchronous test
# since we're not actually starting anything async
assert ep._proc is None
def test_build_fastapi_no_cors(translate):
"""Test _build_fastapi without CORS origins."""
ps = translate._PubSub()
stdio = Mock()
# Should work without CORS origins
app = translate._build_fastapi(ps, stdio, cors_origins=None)
assert app is not None
# Check that routes exist
route_paths = [route.path for route in app.routes if hasattr(route, "path")]
assert "/sse" in route_paths
assert "/message" in route_paths
assert "/healthz" in route_paths
def test_fastapi_sse_client_disconnect(translate, monkeypatch):
"""Test SSE endpoint when client disconnects."""
ps = translate._PubSub()
stdio = Mock()
app = translate._build_fastapi(ps, stdio, keep_alive=1)
# Just test that the app has the SSE route
sse_routes = [route for route in app.routes if hasattr(route, "path") and route.path == "/sse"]
assert len(sse_routes) == 1
@pytest.mark.asyncio
async def test_stdio_endpoint_exception_in_pump(monkeypatch, translate):
"""Test _pump_stdout exception handling."""
async def _test_logic():
ps = translate._PubSub()
# Create a fake process that will raise an exception immediately
class _FakeProcWithError:
def __init__(self):
self.stdin = _DummyWriter()
self.pid = 1234
self.terminated = False
self.stdout = self
def terminate(self):
self.terminated = True
async def wait(self):
return 0
async def readline(self):
# Always raise an exception immediately
raise Exception("Test exception in pump")
fake_proc = _FakeProcWithError()
async def _fake_exec(*_a, **_kw):
return fake_proc
monkeypatch.setattr(translate.asyncio, "create_subprocess_exec", _fake_exec)
ep = translate.StdIOEndpoint("echo hi", ps)
# Start the endpoint - the pump task will be created but fail immediately
await ep.start()
# Just verify the task exists and clean up quickly
assert ep._pump_task is not None
await ep.stop()
# Add timeout to prevent hanging
await asyncio.wait_for(_test_logic(), timeout=3.0)
@pytest.mark.asyncio
async def test_stdio_endpoint_send_not_started(translate):
ep = translate.StdIOEndpoint("cmd", translate._PubSub())
with pytest.raises(RuntimeError):
await ep.send("test")