"""Cancel integration tests for CLI Agent MCP Server.
These tests verify that "canceling a request does not cause the server to exit".
Test scenarios:
1. Start MCP server subprocess
2. Send `call_tool` request (using fake CLI to simulate long-running operation)
3. Trigger cancellation (SIGINT or MCP protocol cancellation)
4. Verify: server process is still alive
5. Verify: can continue processing subsequent requests
Requirements (from REFACTOR_SPEC.md Phase 0):
- Cancellation should not cause server exit
- Cancellation should terminate the running CLI subprocess (no orphan processes)
These tests are marked with @pytest.mark.integration for selective execution:
pytest -m integration tests/test_cancel_integration.py
"""
from __future__ import annotations
import asyncio
import json
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Any, AsyncGenerator
from unittest import mock
import pytest
# Add project paths
PROJECT_ROOT = Path(__file__).parent.parent
FIXTURES_DIR = PROJECT_ROOT / "tests" / "fixtures"
SRC_DIR = PROJECT_ROOT / "src"
SHARED_DIR = PROJECT_ROOT / "shared"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
if str(SHARED_DIR) not in sys.path:
sys.path.insert(0, str(SHARED_DIR))
# Path to fake CLI script
FAKE_CLI_PATH = FIXTURES_DIR / "fake_cli.py"
# ============================================================================
# MCP Protocol Helpers
# ============================================================================
class MCPClient:
"""Simple MCP client for testing.
Communicates with MCP server via stdin/stdout using JSON-RPC 2.0 protocol.
"""
def __init__(self, process: asyncio.subprocess.Process):
self.process = process
self._request_id = 0
self._read_task: asyncio.Task | None = None
self._responses: dict[int, asyncio.Future] = {}
self._notifications: list[dict] = []
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def send_request(self, method: str, params: dict | None = None) -> dict:
"""Send a JSON-RPC request and wait for response."""
request_id = self._next_id()
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
}
if params:
request["params"] = params
# Create response future
response_future: asyncio.Future = asyncio.get_event_loop().create_future()
self._responses[request_id] = response_future
# Send request
request_json = json.dumps(request) + "\n"
assert self.process.stdin is not None
self.process.stdin.write(request_json.encode())
await self.process.stdin.drain()
# Wait for response with timeout
try:
response = await asyncio.wait_for(response_future, timeout=30.0)
return response
except asyncio.TimeoutError:
del self._responses[request_id]
raise TimeoutError(f"No response for request {request_id}: {method}")
async def send_notification(self, method: str, params: dict | None = None) -> None:
"""Send a JSON-RPC notification (no response expected)."""
notification = {
"jsonrpc": "2.0",
"method": method,
}
if params:
notification["params"] = params
notification_json = json.dumps(notification) + "\n"
assert self.process.stdin is not None
self.process.stdin.write(notification_json.encode())
await self.process.stdin.drain()
async def read_messages(self) -> None:
"""Read messages from server stdout in background."""
assert self.process.stdout is not None
while True:
try:
line = await self.process.stdout.readline()
if not line:
break
try:
message = json.loads(line.decode())
except json.JSONDecodeError:
continue
# Handle response
if "id" in message and message["id"] in self._responses:
future = self._responses.pop(message["id"])
if not future.done():
future.set_result(message)
else:
# Notification or unknown message
self._notifications.append(message)
except asyncio.CancelledError:
break
except Exception:
break
async def start_reading(self) -> None:
"""Start background reading task."""
self._read_task = asyncio.create_task(self.read_messages())
async def stop_reading(self) -> None:
"""Stop background reading task."""
if self._read_task:
self._read_task.cancel()
try:
await self._read_task
except asyncio.CancelledError:
pass
async def initialize(self) -> dict:
"""Send MCP initialize request."""
return await self.send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"},
})
async def list_tools(self) -> dict:
"""Send tools/list request."""
return await self.send_request("tools/list", {})
async def call_tool(self, name: str, arguments: dict) -> dict:
"""Send tools/call request."""
return await self.send_request("tools/call", {
"name": name,
"arguments": arguments,
})
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def fake_cli_path() -> Path:
"""Return path to fake CLI script."""
return FAKE_CLI_PATH
@pytest.fixture
def project_root_path() -> Path:
"""Return project root path."""
return PROJECT_ROOT
# ============================================================================
# Unit Tests for Fake CLI
# ============================================================================
class TestFakeCLI:
"""Test the fake CLI script itself."""
@pytest.mark.asyncio
async def test_fake_cli_outputs_jsonl(self, fake_cli_path: Path, tmp_path: Path):
"""Test that fake CLI outputs valid JSONL events."""
process = await asyncio.create_subprocess_exec(
sys.executable, str(fake_cli_path),
"--duration", "1",
"--interval", "0.2",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Send prompt via stdin
assert process.stdin is not None
process.stdin.write(b"test prompt")
await process.stdin.drain()
process.stdin.close()
# Read output
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=10)
# Parse JSONL output
events = []
for line in stdout.decode().strip().split("\n"):
if line:
events.append(json.loads(line))
# Verify events
assert len(events) > 0
# Check for session event
session_events = [e for e in events if e.get("type") == "session"]
assert len(session_events) >= 1
# Check for message events
message_events = [e for e in events if e.get("type") == "message"]
assert len(message_events) >= 1
assert process.returncode == 0
@pytest.mark.asyncio
async def test_fake_cli_responds_to_sigterm(self, fake_cli_path: Path):
"""Test that fake CLI responds to SIGTERM and exits gracefully."""
process = await asyncio.create_subprocess_exec(
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.5",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Send prompt
assert process.stdin is not None
process.stdin.write(b"test prompt")
await process.stdin.drain()
process.stdin.close()
# Wait a bit for process to start
await asyncio.sleep(0.5)
# Send SIGTERM
process.terminate()
# Wait for process to exit
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=5)
# Verify exit code (128 + 15 = 143 for SIGTERM)
assert process.returncode == 143
# Check output contains cancellation event
output = stdout.decode()
assert "cancelled" in output.lower() or "sigterm" in output.lower()
@pytest.mark.asyncio
async def test_fake_cli_responds_to_sigint(self, fake_cli_path: Path):
"""Test that fake CLI responds to SIGINT and exits gracefully."""
process = await asyncio.create_subprocess_exec(
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.5",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Send prompt
assert process.stdin is not None
process.stdin.write(b"test prompt")
await process.stdin.drain()
process.stdin.close()
# Wait a bit for process to start
await asyncio.sleep(0.5)
# Send SIGINT
process.send_signal(signal.SIGINT)
# Wait for process to exit
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=5)
# Verify exit code (128 + 2 = 130 for SIGINT)
assert process.returncode == 130
# Check output contains cancellation event
output = stdout.decode()
assert "cancelled" in output.lower() or "sigint" in output.lower()
# ============================================================================
# Integration Tests for Invoker Cancellation
# ============================================================================
@pytest.mark.integration
class TestInvokerCancellation:
"""Test invoker cancellation behavior.
These tests verify that:
1. Cancelling an invoker execution terminates the subprocess
2. The invoker can be reused after cancellation
3. No orphan processes are left behind
"""
@pytest.mark.asyncio
async def test_invoker_cancellation_terminates_subprocess(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that cancelling invoker.execute() terminates the subprocess.
Scenario:
1. Start a long-running CLI execution
2. Cancel the execution after some output
3. Verify the subprocess is terminated
4. Verify no orphan processes
"""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
# Create invoker with fake CLI command builder
invoker = CodexInvoker()
# Mock build_command to use fake CLI
original_build_command = invoker.build_command
def fake_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.3",
]
invoker.build_command = fake_build_command
# Create params
params = CodexParams(
prompt="test prompt",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
# Start execution in a task
execution_task = asyncio.create_task(invoker.execute(params))
# Wait for subprocess to start and produce some output
await asyncio.sleep(1.0)
# Get subprocess PID before cancellation
subprocess_pid = invoker._process.pid if invoker._process else None
assert subprocess_pid is not None, "Subprocess should be running"
# Cancel the execution
execution_task.cancel()
# Wait for cancellation to complete
try:
await execution_task
except asyncio.CancelledError:
pass
# Give subprocess time to terminate
await asyncio.sleep(0.5)
# Verify subprocess is terminated
assert invoker._process is None or invoker._process.returncode is not None
# Verify no orphan process
try:
# Check if process still exists
os.kill(subprocess_pid, 0)
pytest.fail(f"Orphan process detected: PID {subprocess_pid}")
except ProcessLookupError:
# Process doesn't exist - good
pass
except PermissionError:
# Process exists but we can't signal it (shouldn't happen in tests)
pass
@pytest.mark.asyncio
async def test_invoker_reusable_after_cancellation(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that invoker can be reused after cancellation.
Scenario:
1. Start execution and cancel it
2. Start a new execution on the same invoker
3. Verify the new execution completes successfully
"""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
invoker = CodexInvoker()
# Mock build_command
def fake_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "30" if params.prompt == "long" else "1",
"--interval", "0.2",
]
invoker.build_command = fake_build_command
# First execution - cancel it
params1 = CodexParams(
prompt="long",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
task1 = asyncio.create_task(invoker.execute(params1))
await asyncio.sleep(0.5)
task1.cancel()
try:
await task1
except asyncio.CancelledError:
pass
# Second execution - should complete
params2 = CodexParams(
prompt="short",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
result = await asyncio.wait_for(invoker.execute(params2), timeout=10)
assert result.success is True
# Note: agent_messages may be empty depending on CLI output format
# The key assertion is that execution succeeds after cancellation
# ============================================================================
# Integration Tests for Server Cancellation
# ============================================================================
@pytest.mark.integration
class TestServerCancellation:
"""Test MCP server cancellation behavior.
These tests verify the acceptance criteria from REFACTOR_SPEC.md Phase 0:
- Cancellation does not cause server exit
- Cancellation terminates the running CLI subprocess
NOTE: These tests document the expected behavior AFTER the refactoring.
Before refactoring, some tests may fail due to known issues.
"""
@pytest.mark.asyncio
@pytest.mark.timeout(60)
async def test_sigint_with_active_request_does_not_exit_server(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that SIGINT during active request does not exit server.
EXPECTED BEHAVIOR (after refactoring):
- SIGINT should cancel the active request
- Server should remain running
- Server should accept new requests
CURRENT BEHAVIOR (before refactoring):
- SIGINT may cause entire server to exit
- This test documents the expected behavior for acceptance testing
"""
# This test requires the server to be properly configured to use fake CLI
# For now, we test at the invoker level which is more directly testable
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
# Create a scenario that simulates what the server does
invoker = CodexInvoker()
def fake_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.3",
]
invoker.build_command = fake_build_command
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
# Start execution
task = asyncio.create_task(invoker.execute(params))
# Wait for subprocess to start
await asyncio.sleep(0.8)
assert invoker._process is not None
# Simulate what should happen when SIGINT is received:
# The request should be cancelled, not the server
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# After cancellation, the invoker should be in a clean state
# and ready for new requests
assert invoker._process is None or invoker._process.returncode is not None
# Verify we can start a new execution
def quick_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "0.5",
"--interval", "0.1",
]
invoker.build_command = quick_build_command
result = await asyncio.wait_for(invoker.execute(params), timeout=10)
assert result.success is True
@pytest.mark.asyncio
async def test_cancelled_request_returns_appropriate_response(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that cancelled request propagates CancelledError.
The invoker should raise asyncio.CancelledError when cancelled,
allowing the server to handle it appropriately.
"""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
invoker = CodexInvoker()
def fake_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.3",
]
invoker.build_command = fake_build_command
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
task = asyncio.create_task(invoker.execute(params))
await asyncio.sleep(0.5)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
@pytest.mark.asyncio
async def test_concurrent_requests_independent_cancellation(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that cancelling one request doesn't affect others.
NOTE: This test is for future concurrent request support.
Currently the server may use singleton invokers, but after
refactoring, each request should have independent execution context.
"""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
# Create two independent invokers (simulating per-request context)
invoker1 = CodexInvoker()
invoker2 = CodexInvoker()
def long_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "30",
"--interval", "0.3",
]
def short_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "2",
"--interval", "0.2",
]
invoker1.build_command = long_command
invoker2.build_command = short_command
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
# Start both executions
task1 = asyncio.create_task(invoker1.execute(params))
task2 = asyncio.create_task(invoker2.execute(params))
await asyncio.sleep(0.5)
# Cancel only task1
task1.cancel()
try:
await task1
except asyncio.CancelledError:
pass
# task2 should still complete successfully
result2 = await asyncio.wait_for(task2, timeout=10)
assert result2.success is True
# ============================================================================
# Process Group Isolation Tests
# ============================================================================
@pytest.mark.integration
class TestProcessGroupIsolation:
"""Test subprocess process group isolation.
These tests verify requirements from REFACTOR_SPEC.md:
- Subprocess should be in separate process group
- SIGINT to parent should not kill child directly
- Parent can reliably terminate child process group
NOTE: Current implementation may not have process group isolation.
These tests document expected behavior after refactoring.
"""
@pytest.mark.asyncio
async def test_subprocess_termination_is_reliable(
self, fake_cli_path: Path, project_root_path: Path
):
"""Test that subprocess is reliably terminated.
Verifies:
1. SIGTERM is sent first
2. SIGKILL is sent if SIGTERM times out
3. No orphan processes remain
"""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
invoker = CodexInvoker()
def fake_build_command(params):
return [
sys.executable, str(fake_cli_path),
"--duration", "60",
"--interval", "0.5",
]
invoker.build_command = fake_build_command
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
task = asyncio.create_task(invoker.execute(params))
await asyncio.sleep(0.5)
pid = invoker._process.pid if invoker._process else None
assert pid is not None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Wait for cleanup
await asyncio.sleep(1.0)
# Verify process is gone
try:
os.kill(pid, 0)
pytest.fail(f"Process {pid} should have been terminated")
except ProcessLookupError:
pass # Expected - process is gone
# ============================================================================
# Acceptance Test Summary
# ============================================================================
@pytest.mark.integration
class TestAcceptanceCriteria:
"""Acceptance test summary from REFACTOR_SPEC.md Phase 0.
These tests serve as the acceptance criteria for the refactoring:
1. SIGINT with active request: only cancel request, don't exit server
2. SIGINT without active request: exit server (or per config)
3. MCP protocol cancel: only cancel corresponding request
4. Cancelled subprocess doesn't leave orphans
5. Concurrent requests don't pollute each other's state
Run with: pytest -m integration -v tests/test_cancel_integration.py
"""
@pytest.mark.asyncio
async def test_acceptance_subprocess_cleanup(
self, fake_cli_path: Path, project_root_path: Path
):
"""AC: Cancelled subprocess must be cleaned up (no orphans)."""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
invoker = CodexInvoker()
invoker.build_command = lambda p: [
sys.executable, str(fake_cli_path),
"--duration", "30", "--interval", "0.3"
]
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
pids = []
# Run and cancel multiple times
for _ in range(3):
task = asyncio.create_task(invoker.execute(params))
await asyncio.sleep(0.3)
if invoker._process:
pids.append(invoker._process.pid)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await asyncio.sleep(0.5)
# Verify all processes are cleaned up
for pid in pids:
try:
os.kill(pid, 0)
pytest.fail(f"Orphan process detected: {pid}")
except ProcessLookupError:
pass
@pytest.mark.asyncio
async def test_acceptance_invoker_state_reset(
self, fake_cli_path: Path, project_root_path: Path
):
"""AC: Invoker state must be properly reset after cancellation."""
from cli_agent_mcp.shared.invokers import CodexInvoker, CodexParams, Permission
invoker = CodexInvoker()
invoker.build_command = lambda p: [
sys.executable, str(fake_cli_path),
"--duration", "1", "--interval", "0.1"
]
params = CodexParams(
prompt="test",
workspace=project_root_path,
permission=Permission.READ_ONLY,
)
# First: cancel mid-execution
invoker.build_command = lambda p: [
sys.executable, str(fake_cli_path),
"--duration", "30", "--interval", "0.3"
]
task = asyncio.create_task(invoker.execute(params))
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# After cancellation, internal state should be clean
assert invoker._process is None or invoker._process.returncode is not None
# Second: complete successfully
invoker.build_command = lambda p: [
sys.executable, str(fake_cli_path),
"--duration", "0.5", "--interval", "0.1"
]
result = await asyncio.wait_for(invoker.execute(params), timeout=10)
# Should succeed with fresh state
assert result.success is True
# Note: session_id extraction depends on CLI-specific event format
# The key assertion is that execution succeeds with fresh state