Skip to main content
Glama
test_cancel_integration.py26.6 kB
"""Cancel integration tests for CLI Agent MCP Server. These tests verify that "canceling a request does not cause the server to exit". Test scenarios: 1. Start MCP server subprocess 2. Send `call_tool` request (using fake CLI to simulate long-running operation) 3. Trigger cancellation (SIGINT or MCP protocol cancellation) 4. Verify: server process is still alive 5. Verify: can continue processing subsequent requests Requirements (from REFACTOR_SPEC.md Phase 0): - Cancellation should not cause server exit - Cancellation should terminate the running CLI subprocess (no orphan processes) These tests are marked with @pytest.mark.integration for selective execution: pytest -m integration tests/test_cancel_integration.py """ from __future__ import annotations import asyncio import json import os import signal import subprocess import sys import time from pathlib import Path from typing import Any, AsyncGenerator from unittest import mock import pytest # Add project paths PROJECT_ROOT = Path(__file__).parent.parent FIXTURES_DIR = PROJECT_ROOT / "tests" / "fixtures" SRC_DIR = PROJECT_ROOT / "src" SHARED_DIR = PROJECT_ROOT / "shared" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) if str(SHARED_DIR) not in sys.path: sys.path.insert(0, str(SHARED_DIR)) # Path to fake CLI script FAKE_CLI_PATH = FIXTURES_DIR / "fake_cli.py" # ============================================================================ # MCP Protocol Helpers # ============================================================================ class MCPClient: """Simple MCP client for testing. Communicates with MCP server via stdin/stdout using JSON-RPC 2.0 protocol. """ def __init__(self, process: asyncio.subprocess.Process): self.process = process self._request_id = 0 self._read_task: asyncio.Task | None = None self._responses: dict[int, asyncio.Future] = {} self._notifications: list[dict] = [] def _next_id(self) -> int: self._request_id += 1 return self._request_id async def send_request(self, method: str, params: dict | None = None) -> dict: """Send a JSON-RPC request and wait for response.""" request_id = self._next_id() request = { "jsonrpc": "2.0", "id": request_id, "method": method, } if params: request["params"] = params # Create response future response_future: asyncio.Future = asyncio.get_event_loop().create_future() self._responses[request_id] = response_future # Send request request_json = json.dumps(request) + "\n" assert self.process.stdin is not None self.process.stdin.write(request_json.encode()) await self.process.stdin.drain() # Wait for response with timeout try: response = await asyncio.wait_for(response_future, timeout=30.0) return response except asyncio.TimeoutError: del self._responses[request_id] raise TimeoutError(f"No response for request {request_id}: {method}") async def send_notification(self, method: str, params: dict | None = None) -> None: """Send a JSON-RPC notification (no response expected).""" notification = { "jsonrpc": "2.0", "method": method, } if params: notification["params"] = params notification_json = json.dumps(notification) + "\n" assert self.process.stdin is not None self.process.stdin.write(notification_json.encode()) await self.process.stdin.drain() async def read_messages(self) -> None: """Read messages from server stdout in background.""" assert self.process.stdout is not None while True: try: line = await self.process.stdout.readline() if not line: break try: message = json.loads(line.decode()) except json.JSONDecodeError: continue # Handle response if "id" in message and message["id"] in self._responses: future = self._responses.pop(message["id"]) if not future.done(): future.set_result(message) else: # Notification or unknown message self._notifications.append(message) except asyncio.CancelledError: break except Exception: break async def start_reading(self) -> None: """Start background reading task.""" self._read_task = asyncio.create_task(self.read_messages()) async def stop_reading(self) -> None: """Stop background reading task.""" if self._read_task: self._read_task.cancel() try: await self._read_task except asyncio.CancelledError: pass async def initialize(self) -> dict: """Send MCP initialize request.""" return await self.send_request("initialize", { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"}, }) async def list_tools(self) -> dict: """Send tools/list request.""" return await self.send_request("tools/list", {}) async def call_tool(self, name: str, arguments: dict) -> dict: """Send tools/call request.""" return await self.send_request("tools/call", { "name": name, "arguments": arguments, }) # ============================================================================ # Fixtures # ============================================================================ @pytest.fixture def fake_cli_path() -> Path: """Return path to fake CLI script.""" return FAKE_CLI_PATH @pytest.fixture def project_root_path() -> Path: """Return project root path.""" return PROJECT_ROOT # ============================================================================ # Unit Tests for Fake CLI # ============================================================================ class TestFakeCLI: """Test the fake CLI script itself.""" @pytest.mark.asyncio async def test_fake_cli_outputs_jsonl(self, fake_cli_path: Path, tmp_path: Path): """Test that fake CLI outputs valid JSONL events.""" process = await asyncio.create_subprocess_exec( sys.executable, str(fake_cli_path), "--duration", "1", "--interval", "0.2", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Send prompt via stdin assert process.stdin is not None process.stdin.write(b"test prompt") await process.stdin.drain() process.stdin.close() # Read output stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=10) # Parse JSONL output events = [] for line in stdout.decode().strip().split("\n"): if line: events.append(json.loads(line)) # Verify events assert len(events) > 0 # Check for session event session_events = [e for e in events if e.get("type") == "session"] assert len(session_events) >= 1 # Check for message events message_events = [e for e in events if e.get("type") == "message"] assert len(message_events) >= 1 assert process.returncode == 0 @pytest.mark.asyncio async def test_fake_cli_responds_to_sigterm(self, fake_cli_path: Path): """Test that fake CLI responds to SIGTERM and exits gracefully.""" process = await asyncio.create_subprocess_exec( sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.5", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Send prompt assert process.stdin is not None process.stdin.write(b"test prompt") await process.stdin.drain() process.stdin.close() # Wait a bit for process to start await asyncio.sleep(0.5) # Send SIGTERM process.terminate() # Wait for process to exit stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=5) # Verify exit code (128 + 15 = 143 for SIGTERM) assert process.returncode == 143 # Check output contains cancellation event output = stdout.decode() assert "cancelled" in output.lower() or "sigterm" in output.lower() @pytest.mark.asyncio async def test_fake_cli_responds_to_sigint(self, fake_cli_path: Path): """Test that fake CLI responds to SIGINT and exits gracefully.""" process = await asyncio.create_subprocess_exec( sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.5", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Send prompt assert process.stdin is not None process.stdin.write(b"test prompt") await process.stdin.drain() process.stdin.close() # Wait a bit for process to start await asyncio.sleep(0.5) # Send SIGINT process.send_signal(signal.SIGINT) # Wait for process to exit stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=5) # Verify exit code (128 + 2 = 130 for SIGINT) assert process.returncode == 130 # Check output contains cancellation event output = stdout.decode() assert "cancelled" in output.lower() or "sigint" in output.lower() # ============================================================================ # Integration Tests for Invoker Cancellation # ============================================================================ @pytest.mark.integration class TestInvokerCancellation: """Test invoker cancellation behavior. These tests verify that: 1. Cancelling an invoker execution terminates the subprocess 2. The invoker can be reused after cancellation 3. No orphan processes are left behind """ @pytest.mark.asyncio async def test_invoker_cancellation_terminates_subprocess( self, fake_cli_path: Path, project_root_path: Path ): """Test that cancelling invoker.execute() terminates the subprocess. Scenario: 1. Start a long-running CLI execution 2. Cancel the execution after some output 3. Verify the subprocess is terminated 4. Verify no orphan processes """ from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission # Create invoker with fake CLI command builder invoker = CodexInvoker() # Mock build_command to use fake CLI original_build_command = invoker.build_command def fake_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3", ] invoker.build_command = fake_build_command # Create params params = CodexParams( prompt="test prompt", workspace=project_root_path, permission=Permission.READ_ONLY, ) # Start execution in a task execution_task = asyncio.create_task(invoker.execute(params)) # Wait for subprocess to start and produce some output await asyncio.sleep(1.0) # Get subprocess PID before cancellation subprocess_pid = invoker._process.pid if invoker._process else None assert subprocess_pid is not None, "Subprocess should be running" # Cancel the execution execution_task.cancel() # Wait for cancellation to complete try: await execution_task except asyncio.CancelledError: pass # Give subprocess time to terminate await asyncio.sleep(0.5) # Verify subprocess is terminated assert invoker._process is None or invoker._process.returncode is not None # Verify no orphan process try: # Check if process still exists os.kill(subprocess_pid, 0) pytest.fail(f"Orphan process detected: PID {subprocess_pid}") except ProcessLookupError: # Process doesn't exist - good pass except PermissionError: # Process exists but we can't signal it (shouldn't happen in tests) pass @pytest.mark.asyncio async def test_invoker_reusable_after_cancellation( self, fake_cli_path: Path, project_root_path: Path ): """Test that invoker can be reused after cancellation. Scenario: 1. Start execution and cancel it 2. Start a new execution on the same invoker 3. Verify the new execution completes successfully """ from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission invoker = CodexInvoker() # Mock build_command def fake_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "30" if params.prompt == "long" else "1", "--interval", "0.2", ] invoker.build_command = fake_build_command # First execution - cancel it params1 = CodexParams( prompt="long", workspace=project_root_path, permission=Permission.READ_ONLY, ) task1 = asyncio.create_task(invoker.execute(params1)) await asyncio.sleep(0.5) task1.cancel() try: await task1 except asyncio.CancelledError: pass # Second execution - should complete params2 = CodexParams( prompt="short", workspace=project_root_path, permission=Permission.READ_ONLY, ) result = await asyncio.wait_for(invoker.execute(params2), timeout=10) assert result.success is True # Note: agent_messages may be empty depending on CLI output format # The key assertion is that execution succeeds after cancellation # ============================================================================ # Integration Tests for Server Cancellation # ============================================================================ @pytest.mark.integration class TestServerCancellation: """Test MCP server cancellation behavior. These tests verify the acceptance criteria from REFACTOR_SPEC.md Phase 0: - Cancellation does not cause server exit - Cancellation terminates the running CLI subprocess NOTE: These tests document the expected behavior AFTER the refactoring. Before refactoring, some tests may fail due to known issues. """ @pytest.mark.asyncio @pytest.mark.timeout(60) async def test_sigint_with_active_request_does_not_exit_server( self, fake_cli_path: Path, project_root_path: Path ): """Test that SIGINT during active request does not exit server. EXPECTED BEHAVIOR (after refactoring): - SIGINT should cancel the active request - Server should remain running - Server should accept new requests CURRENT BEHAVIOR (before refactoring): - SIGINT may cause entire server to exit - This test documents the expected behavior for acceptance testing """ # This test requires the server to be properly configured to use fake CLI # For now, we test at the invoker level which is more directly testable from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission # Create a scenario that simulates what the server does invoker = CodexInvoker() def fake_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3", ] invoker.build_command = fake_build_command params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) # Start execution task = asyncio.create_task(invoker.execute(params)) # Wait for subprocess to start await asyncio.sleep(0.8) assert invoker._process is not None # Simulate what should happen when SIGINT is received: # The request should be cancelled, not the server task.cancel() try: await task except asyncio.CancelledError: pass # After cancellation, the invoker should be in a clean state # and ready for new requests assert invoker._process is None or invoker._process.returncode is not None # Verify we can start a new execution def quick_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "0.5", "--interval", "0.1", ] invoker.build_command = quick_build_command result = await asyncio.wait_for(invoker.execute(params), timeout=10) assert result.success is True @pytest.mark.asyncio async def test_cancelled_request_returns_appropriate_response( self, fake_cli_path: Path, project_root_path: Path ): """Test that cancelled request propagates CancelledError. The invoker should raise asyncio.CancelledError when cancelled, allowing the server to handle it appropriately. """ from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission invoker = CodexInvoker() def fake_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3", ] invoker.build_command = fake_build_command params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) task = asyncio.create_task(invoker.execute(params)) await asyncio.sleep(0.5) task.cancel() with pytest.raises(asyncio.CancelledError): await task @pytest.mark.asyncio async def test_concurrent_requests_independent_cancellation( self, fake_cli_path: Path, project_root_path: Path ): """Test that cancelling one request doesn't affect others. NOTE: This test is for future concurrent request support. Currently the server may use singleton invokers, but after refactoring, each request should have independent execution context. """ from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission # Create two independent invokers (simulating per-request context) invoker1 = CodexInvoker() invoker2 = CodexInvoker() def long_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3", ] def short_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "2", "--interval", "0.2", ] invoker1.build_command = long_command invoker2.build_command = short_command params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) # Start both executions task1 = asyncio.create_task(invoker1.execute(params)) task2 = asyncio.create_task(invoker2.execute(params)) await asyncio.sleep(0.5) # Cancel only task1 task1.cancel() try: await task1 except asyncio.CancelledError: pass # task2 should still complete successfully result2 = await asyncio.wait_for(task2, timeout=10) assert result2.success is True # ============================================================================ # Process Group Isolation Tests # ============================================================================ @pytest.mark.integration class TestProcessGroupIsolation: """Test subprocess process group isolation. These tests verify requirements from REFACTOR_SPEC.md: - Subprocess should be in separate process group - SIGINT to parent should not kill child directly - Parent can reliably terminate child process group NOTE: Current implementation may not have process group isolation. These tests document expected behavior after refactoring. """ @pytest.mark.asyncio async def test_subprocess_termination_is_reliable( self, fake_cli_path: Path, project_root_path: Path ): """Test that subprocess is reliably terminated. Verifies: 1. SIGTERM is sent first 2. SIGKILL is sent if SIGTERM times out 3. No orphan processes remain """ from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission invoker = CodexInvoker() def fake_build_command(params): return [ sys.executable, str(fake_cli_path), "--duration", "60", "--interval", "0.5", ] invoker.build_command = fake_build_command params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) task = asyncio.create_task(invoker.execute(params)) await asyncio.sleep(0.5) pid = invoker._process.pid if invoker._process else None assert pid is not None task.cancel() try: await task except asyncio.CancelledError: pass # Wait for cleanup await asyncio.sleep(1.0) # Verify process is gone try: os.kill(pid, 0) pytest.fail(f"Process {pid} should have been terminated") except ProcessLookupError: pass # Expected - process is gone # ============================================================================ # Acceptance Test Summary # ============================================================================ @pytest.mark.integration class TestAcceptanceCriteria: """Acceptance test summary from REFACTOR_SPEC.md Phase 0. These tests serve as the acceptance criteria for the refactoring: 1. SIGINT with active request: only cancel request, don't exit server 2. SIGINT without active request: exit server (or per config) 3. MCP protocol cancel: only cancel corresponding request 4. Cancelled subprocess doesn't leave orphans 5. Concurrent requests don't pollute each other's state Run with: pytest -m integration -v tests/test_cancel_integration.py """ @pytest.mark.asyncio async def test_acceptance_subprocess_cleanup( self, fake_cli_path: Path, project_root_path: Path ): """AC: Cancelled subprocess must be cleaned up (no orphans).""" from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission invoker = CodexInvoker() invoker.build_command = lambda p: [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3" ] params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) pids = [] # Run and cancel multiple times for _ in range(3): task = asyncio.create_task(invoker.execute(params)) await asyncio.sleep(0.3) if invoker._process: pids.append(invoker._process.pid) task.cancel() try: await task except asyncio.CancelledError: pass await asyncio.sleep(0.5) # Verify all processes are cleaned up for pid in pids: try: os.kill(pid, 0) pytest.fail(f"Orphan process detected: {pid}") except ProcessLookupError: pass @pytest.mark.asyncio async def test_acceptance_invoker_state_reset( self, fake_cli_path: Path, project_root_path: Path ): """AC: Invoker state must be properly reset after cancellation.""" from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission invoker = CodexInvoker() invoker.build_command = lambda p: [ sys.executable, str(fake_cli_path), "--duration", "1", "--interval", "0.1" ] params = CodexParams( prompt="test", workspace=project_root_path, permission=Permission.READ_ONLY, ) # First: cancel mid-execution invoker.build_command = lambda p: [ sys.executable, str(fake_cli_path), "--duration", "30", "--interval", "0.3" ] task = asyncio.create_task(invoker.execute(params)) await asyncio.sleep(0.5) task.cancel() try: await task except asyncio.CancelledError: pass # After cancellation, internal state should be clean assert invoker._process is None or invoker._process.returncode is not None # Second: complete successfully invoker.build_command = lambda p: [ sys.executable, str(fake_cli_path), "--duration", "0.5", "--interval", "0.1" ] result = await asyncio.wait_for(invoker.execute(params), timeout=10) # Should succeed with fresh state assert result.success is True # Note: session_id extraction depends on CLI-specific event format # The key assertion is that execution succeeds with fresh state

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server