Skip to main content
Glama
test_client_integration.py34.4 kB
"""Integration tests for the xonsh-based client.""" from __future__ import annotations import asyncio import json import os import queue import shlex import subprocess import sys import time import tempfile import uuid from concurrent.futures import CancelledError, Future from contextlib import contextmanager from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from threading import Thread import pytest import anyio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from mcp2term_client.input import InputChunk, QueueInputReader from mcp2term_client.session import ( CommandResponse, RemoteMcpSession, RemoteMcpSessionError, ) from mcp2term_client.shell import RemoteCommandProcessor from mcp2term_client.state import RemoteShellState ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) SERVER_HOST = "127.0.0.1" @contextmanager def running_server(*, extra_env: dict[str, str] | None = None) -> str: import socket env = os.environ.copy() env["PYTHONPATH"] = str(os.path.join(ROOT, "src")) if extra_env: env.update(extra_env) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind((SERVER_HOST, 0)) port = sock.getsockname()[1] script = ( "from mcp2term.config import ServerConfig\n" "from mcp2term.server import create_server\n" "config = ServerConfig.from_env()\n" "server = create_server(config=config)\n" f"server.settings.host = '{SERVER_HOST}'\n" f"server.settings.port = {port}\n" "server.run(transport='streamable-http', mount_path=None)\n" ) command = [sys.executable, "-c", script] process = subprocess.Popen(command, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: server_url = f"http://{SERVER_HOST}:{port}/mcp" _wait_for_server(server_url, process) yield server_url finally: process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() process.wait(timeout=5) def _wait_for_server(url: str, process: subprocess.Popen[str], timeout: float = 15.0) -> None: deadline = time.monotonic() + timeout while time.monotonic() < deadline: if process.poll() is not None: stdout, stderr = process.communicate() raise RuntimeError( "Server process exited prematurely:\n" f"STDOUT: {stdout.decode().strip()}\n" f"STDERR: {stderr.decode().strip()}" ) try: if _probe_server(url): return except Exception: pass time.sleep(0.2) process.terminate() stdout, stderr = process.communicate() raise RuntimeError( "Server did not become ready in time:\n" f"STDOUT: {stdout.decode().strip()}\n" f"STDERR: {stderr.decode().strip()}" ) def _probe_server(url: str) -> bool: """Attempt to establish and initialize an MCP session.""" async def attempt() -> bool: try: async with streamablehttp_client(url, timeout=5.0, sse_read_timeout=5.0) as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() return True except Exception: return False return anyio.run(attempt) @contextmanager def not_found_server() -> str: class Handler(BaseHTTPRequestHandler): def do_POST(self) -> None: # noqa: N802 self.send_response(404) self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(b"missing") def log_message(self, format: str, *args: object) -> None: # noqa: A003 return server = ThreadingHTTPServer((SERVER_HOST, 0), Handler) thread = Thread(target=server.serve_forever, daemon=True) thread.start() try: url = f"http://{SERVER_HOST}:{server.server_address[1]}" yield url finally: server.shutdown() thread.join(timeout=5) @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_executes_command(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: cwd = session.resolve_working_directory() state = RemoteShellState(cwd=cwd) response = session.run_command( "echo integration", working_directory=state.cwd, environment=state.environment, ) finally: session.close() assert "integration" in response.stdout assert response.return_code == 0 assert response.command_id @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_processor_manages_state(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: state = RemoteShellState(cwd=session.resolve_working_directory()) statuses: list[int] = [] processor = RemoteCommandProcessor(session=session, state=state, status_callback=statuses.append) assert processor.execute("pwd") == 0 assert statuses[-1] == 0 assert processor.execute("export TEST_VAR=friend") == 0 assert state.environment["TEST_VAR"] == "friend" assert processor.execute("TEST_VAR=world echo $TEST_VAR") == 0 assert statuses[-1] == 0 assert state.environment["TEST_VAR"] == "friend" assert processor.execute("unset TEST_VAR") == 0 assert "TEST_VAR" not in state.environment assert processor.execute("cd /") == 0 assert state.cwd == "/" finally: session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_normalizes_root_url(use_real_dependencies: bool) -> None: with running_server() as full_url: base_url = full_url.rsplit("/mcp", 1)[0] session = RemoteMcpSession(base_url) session.start() try: assert session.endpoint_url.endswith("/mcp") cwd = session.resolve_working_directory() response = session.run_command( "echo normalized", working_directory=cwd, environment=None, ) finally: session.close() assert "normalized" in response.stdout assert response.return_code == 0 assert response.command_id @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_can_cancel_command(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: cwd = session.resolve_working_directory() command_id, future = session.run_command_async( "python -c 'import time; time.sleep(5)'", working_directory=cwd, environment=None, ) # Allow the command to start before sending the cancellation. time.sleep(0.5) cancel_response = session.cancel_command(command_id) assert cancel_response.command_id == command_id assert cancel_response.delivered response = future.result(timeout=10.0) finally: session.close() assert response.return_code != 0 assert response.command_id == command_id @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_reports_diagnostics_for_not_found(use_real_dependencies: bool) -> None: with not_found_server() as url: session = RemoteMcpSession(url) with pytest.raises(RemoteMcpSessionError) as excinfo: session.start() message = str(excinfo.value) assert "HTTP 404" in message assert url in message session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_streams_stdin(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: cwd = session.resolve_working_directory() command_id, future = session.run_command_async( "python -c \"import sys; data = sys.stdin.read(); print(data.strip())\"", working_directory=cwd, environment=None, ) delivered = False for _ in range(50): delivered = session.send_stdin(command_id, "hello remote\n") if delivered: break time.sleep(0.1) assert delivered, "Failed to deliver stdin to remote command" eof_delivered = False for _ in range(50): eof_delivered = session.send_stdin(command_id, "", eof=True) if eof_delivered: break time.sleep(0.1) assert eof_delivered, "Failed to deliver EOF to remote command" response = future.result(timeout=10.0) finally: session.close() assert "hello remote" in response.stdout assert response.return_code == 0 @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_processor_handles_interactive_cli(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: state = RemoteShellState(cwd=session.resolve_working_directory()) statuses: list[int] = [] errors: list[str] = [] input_queue: "queue.Queue[InputChunk]" = queue.Queue() exit_code = -1 verify_response = None def reader_factory() -> QueueInputReader: return QueueInputReader(input_queue) processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: None, error_writer=errors.append, input_reader_factory=reader_factory, ) def feed() -> None: time.sleep(0.5) input_queue.put(InputChunk(data="from pathlib import Path\n")) time.sleep(0.2) input_queue.put( InputChunk(data="Path('interactive_flag.txt').write_text('client!')\n") ) time.sleep(0.2) input_queue.put(InputChunk(data="exit()\n")) feeder = Thread(target=feed, daemon=True) feeder.start() exit_code = processor.execute("python") feeder.join(timeout=5) verify_response = session.run_command( "python -c \"import pathlib; print(pathlib.Path('interactive_flag.txt').read_text())\"", working_directory=state.cwd, environment=None, ) session.run_command( "python -c \"import pathlib; pathlib.Path('interactive_flag.txt').unlink(missing_ok=True)\"", working_directory=state.cwd, environment=None, ) finally: session.close() assert exit_code == 0 assert statuses and statuses[-1] == 0 assert not errors assert verify_response is not None assert "client!" in verify_response.stdout @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_survives_server_warnings(use_real_dependencies: bool) -> None: warning_messages: list[str] = [] response: CommandResponse | None = None follow_up = None with running_server(extra_env={"MCP2TERM_SHELL": "/nonexistent-mcp-shell"}) as url: session = RemoteMcpSession(url, notice_writer=warning_messages.append) session.start() try: response = session.run_command( "echo failure", working_directory="/", environment=None, ) follow_up = session.cancel_command("missing-command") finally: session.close() assert response is not None assert response.return_code != 0 assert response.warnings assert follow_up is not None assert follow_up.warnings assert any("WARNING" in message for message in warning_messages) @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_remote_session_close_cancels_active_requests(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() future: Future[CommandResponse] | None = None try: cwd = session.resolve_working_directory() _, future = session.run_command_async( "python -c 'import time; time.sleep(10)'", working_directory=cwd, environment=None, ) time.sleep(0.5) finally: session.close() assert future is not None assert future.cancelled() with pytest.raises(CancelledError): future.result(timeout=0.1) @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_manage_file_command_executes_remote_operations( use_real_dependencies: bool, ) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() state: RemoteShellState | None = None target_dir = "mcp-file-tests" try: cwd = session.resolve_working_directory() state = RemoteShellState(cwd=cwd) statuses: list[int] = [] outputs: list[str] = [] errors: list[str] = [] processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: outputs.append(message), error_writer=lambda message: errors.append(message), ) unique_name = f"mcp-file-{uuid.uuid4().hex}.txt" relative_path = f"{target_dir}/{unique_name}" outputs.clear() errors.clear() create_status = processor.execute( f"filetool create {relative_path} --content alpha --create-parents --overwrite" ) assert create_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert outputs outputs.clear() errors.clear() print_status = processor.execute(f"filetool print {relative_path}") assert print_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("| alpha" in line for line in outputs) outputs.clear() errors.clear() locate_status = processor.execute( f"filetool locate {relative_path} --content alpha" ) assert locate_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Line" in line for line in outputs) outputs.clear() errors.clear() anchor_insert_status = processor.execute( f"filetool insert {relative_path} --content 'epsilon\\\\n' --anchor alpha --anchor-after" ) assert anchor_insert_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Inserted" in line for line in outputs) outputs.clear() errors.clear() anchor_print_status = processor.execute(f"filetool print {relative_path}") assert anchor_print_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("epsilon" in line for line in outputs) patch_payload = ( f"--- a/{relative_path}\n" f"+++ b/{relative_path}\n" "@@ -1 +1 @@\n" "-alpha\n" "\\ No newline at end of file\n" "+zulu\n" "\\ No newline at end of file\n" ) patch_file = tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", delete=False ) try: patch_file.write(patch_payload) patch_file.flush() patch_path = patch_file.name finally: patch_file.close() try: outputs.clear() errors.clear() patch_status = processor.execute( f"filetool patch {relative_path} --content-from-file {shlex.quote(patch_path)}" ) assert patch_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Applied patch" in line for line in outputs) outputs.clear() errors.clear() print_after_patch = processor.execute( f"filetool print {relative_path}" ) assert print_after_patch == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("zulu" in line for line in outputs) inline_patch_payload = ( f"--- a/{relative_path}\\n" f"+++ b/{relative_path}\\n" "@@ -1 +1 @@\\n" "-zulu\\n" "\\ No newline at end of file\\n" "+inline-escape\\n" "\\ No newline at end of file\\n" ) outputs.clear() errors.clear() inline_patch_status = processor.execute( f"filetool patch {relative_path} --content {shlex.quote(inline_patch_payload)}" ) assert inline_patch_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Applied patch" in line for line in outputs) outputs.clear() errors.clear() print_after_inline = processor.execute( f"filetool print {relative_path}" ) assert print_after_inline == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("inline-escape" in line for line in outputs) literal_name = f"mcp-literal-{uuid.uuid4().hex}.txt" literal_path = f"{target_dir}/{literal_name}" decoded_payload = "decoded\\ncontent\\n" outputs.clear() errors.clear() write_status = processor.execute( " ".join( [ "filetool", "write", literal_path, "--content", shlex.quote(decoded_payload), "--create-parents", ] ) ) assert write_status == 0 assert statuses and statuses[-1] == 0 assert not errors outputs.clear() errors.clear() print_decoded_status = processor.execute( f"filetool print {literal_path}" ) assert print_decoded_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("| decoded" in line for line in outputs) assert any("| content" in line for line in outputs) literal_payload = "literal\\nvalue" outputs.clear() errors.clear() append_literal_status = processor.execute( " ".join( [ "filetool", "append", literal_path, "--content", shlex.quote(literal_payload), "--escape-profile", "none", ] ) ) assert append_literal_status == 0 assert statuses and statuses[-1] == 0 assert not errors outputs.clear() errors.clear() print_literal_status = processor.execute( f"filetool print {literal_path}" ) assert print_literal_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("literal\\nvalue" in line for line in outputs) prepend_payload = "preamble\n" outputs.clear() errors.clear() prepend_status = processor.execute( " ".join( [ "filetool", "prepend", literal_path, "--content", shlex.quote(prepend_payload), ] ) ) assert prepend_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Prepended" in line or "Created" in line for line in outputs) outputs.clear() errors.clear() print_prepended_status = processor.execute( f"filetool print {literal_path} --start-line 1 --end-line 1" ) assert print_prepended_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("preamble" in line for line in outputs) outputs.clear() errors.clear() substitute_status = processor.execute( " ".join( [ "filetool", "substitute", literal_path, "--pattern", shlex.quote("content"), "--content", shlex.quote("CONTENT"), ] ) ) assert substitute_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("Substituted" in line for line in outputs) assert any("pattern" in line for line in outputs) assert any("replacements" in line for line in outputs) outputs.clear() errors.clear() regex_substitute_status = processor.execute( " ".join( [ "filetool", "substitute", literal_path, "--pattern", shlex.quote(r"^literal"), "--content", shlex.quote("LITERAL"), "--regex", "--ignore-case", "--max-replacements", "1", ] ) ) assert regex_substitute_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("regex" in line for line in outputs) assert any("ignore_case" in line for line in outputs) assert any("max_replacements" in line for line in outputs) outputs.clear() errors.clear() final_print_status = processor.execute( f"filetool print {literal_path}" ) assert final_print_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("CONTENT" in line for line in outputs) assert any("LITERAL" in line for line in outputs) finally: os.unlink(patch_path) finally: cleanup_state = state try: if cleanup_state is not None: session.run_command( f"rm -rf {shlex.quote(target_dir)}", working_directory=cleanup_state.cwd, environment=cleanup_state.environment, ) finally: session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_manage_file_out_of_range_keeps_session_alive( use_real_dependencies: bool, ) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() target_dir = "mcp-file-tests" try: cwd = session.resolve_working_directory() state = RemoteShellState(cwd=cwd) statuses: list[int] = [] outputs: list[str] = [] errors: list[str] = [] processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: outputs.append(message), error_writer=lambda message: errors.append(message), ) unique_name = f"mcp-file-{uuid.uuid4().hex}.txt" relative_path = f"{target_dir}/{unique_name}" outputs.clear() errors.clear() create_status = processor.execute( f"filetool create {relative_path} --content placeholder --create-parents --overwrite" ) assert create_status == 0 assert statuses and statuses[-1] == 0 assert not errors outputs.clear() errors.clear() invalid_status = processor.execute( f"filetool print {relative_path} --start-line 480 --end-line 480" ) assert invalid_status != 0 assert statuses and statuses[-1] != 0 assert any( "Start line 480 is beyond the end of the file" in message for message in errors ) outputs.clear() errors.clear() follow_up_status = processor.execute(f"filetool print {relative_path}") assert follow_up_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert outputs finally: try: session.run_command( f"rm -rf {shlex.quote(target_dir)}", working_directory=session.resolve_working_directory(), environment=None, ) finally: session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_filetool_stat_reports_metadata(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: state = RemoteShellState(cwd=session.resolve_working_directory()) statuses: list[int] = [] outputs: list[str] = [] errors: list[str] = [] processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: outputs.append(message), error_writer=lambda message: errors.append(message), ) target_dir = "mcp-file-tests" file_name = f"stat-{uuid.uuid4().hex}.txt" relative_path = f"{target_dir}/{file_name}" create_status = processor.execute( f"filetool create {relative_path} --content sentinel --create-parents --overwrite" ) assert create_status == 0 assert statuses and statuses[-1] == 0 outputs.clear() errors.clear() stat_status = processor.execute(f"filetool stat {relative_path}") assert stat_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert any("size_bytes" in entry for entry in outputs) assert any("file_type" in entry for entry in outputs) outputs.clear() errors.clear() json_status = processor.execute(f"filetool stat {relative_path} --format json") assert json_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert len(outputs) > 1 json_blob = "\n".join(outputs[1:]) metadata = json.loads(json_blob) assert metadata["exists"] is True assert metadata["follow_symlinks"] is True assert metadata["file_type"] == "file" assert metadata["size_bytes"] >= len("sentinel") finally: try: session.run_command( f"rm -rf {shlex.quote(target_dir)}", working_directory=session.resolve_working_directory(), environment=None, ) finally: session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_filetool_stat_reports_missing_file(use_real_dependencies: bool) -> None: with running_server() as url: session = RemoteMcpSession(url) session.start() try: state = RemoteShellState(cwd=session.resolve_working_directory()) statuses: list[int] = [] outputs: list[str] = [] errors: list[str] = [] processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: outputs.append(message), error_writer=lambda message: errors.append(message), ) missing_name = f"missing-{uuid.uuid4().hex}.txt" missing_path = f"ghost/{missing_name}" status = processor.execute( f"filetool stat {missing_path} --no-follow-symlinks" ) assert status != 0 assert statuses and statuses[-1] != 0 assert not outputs assert errors assert any("File not found" in entry for entry in errors) assert any("exists" in entry and "False" in entry for entry in errors) assert any("follow_symlinks" in entry and "False" in entry for entry in errors) finally: session.close() @pytest.mark.parametrize("use_real_dependencies", [False, True]) def test_manage_file_warning_notice_failure_does_not_terminate_session( use_real_dependencies: bool, ) -> None: triggered: list[str] = [] def faulty_notice(message: str) -> None: triggered.append(message) raise RuntimeError("sink failure") with running_server() as url: session = RemoteMcpSession(url, notice_writer=faulty_notice) session.start() target_dir = "mcp-notice-tests" try: cwd = session.resolve_working_directory() state = RemoteShellState(cwd=cwd) statuses: list[int] = [] outputs: list[str] = [] errors: list[str] = [] processor = RemoteCommandProcessor( session=session, state=state, status_callback=statuses.append, output_writer=lambda message: outputs.append(message), error_writer=lambda message: errors.append(message), ) unique_name = f"mcp-file-{uuid.uuid4().hex}.txt" relative_path = f"{target_dir}/{unique_name}" outputs.clear() errors.clear() create_status = processor.execute( f"filetool create {relative_path} --content sentinel --create-parents --overwrite" ) assert create_status == 0 assert statuses and statuses[-1] == 0 assert not errors outputs.clear() errors.clear() invalid_status = processor.execute( f"filetool print {relative_path} --start-line 480 --end-line 480" ) assert invalid_status != 0 assert statuses and statuses[-1] != 0 assert any( "Start line 480 is beyond the end of the file" in message for message in errors ) assert triggered, "notice writer should receive at least one warning" outputs.clear() errors.clear() follow_up_status = processor.execute(f"filetool print {relative_path}") assert follow_up_status == 0 assert statuses and statuses[-1] == 0 assert not errors assert outputs finally: try: session.run_command( f"rm -rf {shlex.quote(target_dir)}", working_directory=session.resolve_working_directory(), environment=None, ) finally: session.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FreddyE1982/mcp2term'

If you have feedback or need assistance with the MCP directory API, please join our Discord server