Skip to main content
Glama
test_process_runner.py19.8 kB
"""ProcessRunner unit tests. Test coverage: - Basic process execution (stdout reading) - Stdin writing - Process isolation (new session/process group) - Cancellation and termination - Stderr handling - Cleanup behavior (shield from cancellation) """ from __future__ import annotations import asyncio import os import signal import sys from pathlib import Path import pytest # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from cli_agent_mcp.runtime.process_runner import ( IS_WINDOWS, ProcessRunner, ProcessSpec, ) # ============================================================================= # Fixtures # ============================================================================= @pytest.fixture def temp_workspace(tmp_path: Path) -> Path: """Create temporary workspace directory.""" workspace = tmp_path / "workspace" workspace.mkdir() return workspace @pytest.fixture def runner() -> ProcessRunner: """Create ProcessRunner instance with short timeouts for testing.""" return ProcessRunner(term_timeout=0.5, kill_timeout=0.3) # ============================================================================= # Basic Execution Tests # ============================================================================= class TestBasicExecution: """Test basic process execution.""" @pytest.mark.asyncio async def test_simple_command(self, temp_workspace: Path, runner: ProcessRunner): """Test running a simple command.""" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo", "hello"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["echo", "hello"], cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() assert "hello" in result @pytest.mark.asyncio async def test_multiline_output(self, temp_workspace: Path, runner: ProcessRunner): """Test process with multiple output lines.""" if IS_WINDOWS: # Windows: use type command with a temp file test_file = temp_workspace / "test.txt" test_file.write_text("line1\nline2\nline3\n") spec = ProcessSpec( argv=["cmd", "/c", "type", str(test_file)], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sh", "-c", "echo line1; echo line2; echo line3"], cwd=temp_workspace, ) lines = [] async for chunk in runner.run(spec): lines.append(chunk.decode().strip()) # Filter empty lines lines = [l for l in lines if l] assert len(lines) >= 3 assert "line1" in lines[0] assert "line2" in lines[1] assert "line3" in lines[2] @pytest.mark.asyncio async def test_working_directory(self, temp_workspace: Path, runner: ProcessRunner): """Test that working directory is correctly set.""" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "cd"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["pwd"], cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() assert str(temp_workspace) in result or temp_workspace.name in result # ============================================================================= # Stdin Tests # ============================================================================= class TestStdinHandling: """Test stdin handling.""" @pytest.mark.asyncio async def test_stdin_write(self, temp_workspace: Path, runner: ProcessRunner): """Test writing to stdin.""" if IS_WINDOWS: # Windows: use findstr which reads stdin spec = ProcessSpec( argv=["findstr", "."], cwd=temp_workspace, stdin_bytes=b"hello from stdin\n", ) else: spec = ProcessSpec( argv=["cat"], cwd=temp_workspace, stdin_bytes=b"hello from stdin\n", ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() assert "hello from stdin" in result @pytest.mark.asyncio async def test_stdin_multiline(self, temp_workspace: Path, runner: ProcessRunner): """Test writing multiple lines to stdin.""" stdin_data = b"line1\nline2\nline3\n" if IS_WINDOWS: spec = ProcessSpec( argv=["findstr", "."], cwd=temp_workspace, stdin_bytes=stdin_data, ) else: spec = ProcessSpec( argv=["cat"], cwd=temp_workspace, stdin_bytes=stdin_data, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode() assert "line1" in result assert "line2" in result assert "line3" in result # ============================================================================= # Stderr Handling Tests # ============================================================================= class TestStderrHandling: """Test stderr handling.""" @pytest.mark.asyncio async def test_stderr_callback(self, temp_workspace: Path, runner: ProcessRunner): """Test stderr callback is invoked.""" stderr_chunks: list[bytes] = [] def on_stderr(chunk: bytes) -> None: stderr_chunks.append(chunk) if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo error message 1>&2"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sh", "-c", "echo 'error message' >&2"], cwd=temp_workspace, ) # Consume all stdout async for _ in runner.run(spec, on_stderr=on_stderr): pass stderr = b"".join(stderr_chunks).decode() assert "error" in stderr.lower() @pytest.mark.asyncio async def test_mixed_stdout_stderr(self, temp_workspace: Path, runner: ProcessRunner): """Test handling both stdout and stderr.""" stderr_chunks: list[bytes] = [] def on_stderr(chunk: bytes) -> None: stderr_chunks.append(chunk) if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo stdout & echo stderr 1>&2"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sh", "-c", "echo stdout; echo stderr >&2"], cwd=temp_workspace, ) stdout_chunks = [] async for chunk in runner.run(spec, on_stderr=on_stderr): stdout_chunks.append(chunk) stdout = b"".join(stdout_chunks).decode() stderr = b"".join(stderr_chunks).decode() assert "stdout" in stdout assert "stderr" in stderr # ============================================================================= # Process Isolation Tests # ============================================================================= class TestProcessIsolation: """Test process isolation (new session/process group).""" @pytest.mark.asyncio @pytest.mark.skipif(IS_WINDOWS, reason="POSIX-specific test") async def test_new_session_posix(self, temp_workspace: Path, runner: ProcessRunner): """Test that process runs in new session on POSIX.""" # Get process's session ID - should differ from parent spec = ProcessSpec( argv=["sh", "-c", "ps -o sid= -p $$"], cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) child_sid = b"".join(output).decode().strip() parent_sid = str(os.getsid(os.getpid())) # Child should be in different session assert child_sid != parent_sid @pytest.mark.asyncio @pytest.mark.skipif(IS_WINDOWS, reason="POSIX-specific test") async def test_process_group_posix(self, temp_workspace: Path, runner: ProcessRunner): """Test that process is its own process group leader on POSIX.""" # Process group ID should equal PID when start_new_session=True # Use Python to reliably get pgid since ps output format varies across systems spec = ProcessSpec( argv=["python3", "-c", "import os; print(f'pid={os.getpid()} pgid={os.getpgid(0)}')"], cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() # Parse pid and pgid # Output looks like: pid=12345 pgid=12345 parts = result.split() if len(parts) >= 2: pid_part = parts[0].split("=")[1] if "=" in parts[0] else parts[0] pgid_part = parts[1].split("=")[1] if "=" in parts[1] else parts[1] # They should be equal (process is group leader) assert pid_part.strip() == pgid_part.strip() # ============================================================================= # Termination Tests # ============================================================================= class TestTermination: """Test process termination.""" @pytest.mark.asyncio async def test_process_terminates_cleanly( self, temp_workspace: Path, runner: ProcessRunner ): """Test that process terminates cleanly after output consumed.""" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo done"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["echo", "done"], cwd=temp_workspace, ) async for _ in runner.run(spec): pass # Process should have terminated normally # (no exception raised) @pytest.mark.asyncio async def test_long_running_process_termination( self, temp_workspace: Path, runner: ProcessRunner ): """Test termination of long-running process.""" if IS_WINDOWS: # Windows: ping with -n for count spec = ProcessSpec( argv=["ping", "-n", "100", "127.0.0.1"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sleep", "100"], cwd=temp_workspace, ) # Start reading but cancel early count = 0 async for _ in runner.run(spec): count += 1 if count >= 2: # We've seen some output, break to trigger cleanup break # Cleanup should terminate the process # Give a moment for cleanup await asyncio.sleep(0.1) @pytest.mark.asyncio async def test_cancellation_terminates_process( self, temp_workspace: Path, runner: ProcessRunner ): """Test that task cancellation terminates subprocess.""" if IS_WINDOWS: spec = ProcessSpec( argv=["ping", "-n", "100", "127.0.0.1"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sleep", "100"], cwd=temp_workspace, ) async def run_process(): async for _ in runner.run(spec): pass task = asyncio.create_task(run_process()) # Let it start await asyncio.sleep(0.2) # Cancel the task task.cancel() try: await task except asyncio.CancelledError: pass # Process should be terminated by cleanup await asyncio.sleep(0.2) # ============================================================================= # Environment Tests # ============================================================================= class TestEnvironment: """Test environment variable handling.""" @pytest.mark.asyncio async def test_custom_environment( self, temp_workspace: Path, runner: ProcessRunner ): """Test custom environment variables.""" custom_env = os.environ.copy() custom_env["TEST_VAR"] = "test_value_123" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo %TEST_VAR%"], cwd=temp_workspace, env=custom_env, ) else: spec = ProcessSpec( argv=["sh", "-c", "echo $TEST_VAR"], cwd=temp_workspace, env=custom_env, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() assert "test_value_123" in result @pytest.mark.asyncio async def test_inherit_environment( self, temp_workspace: Path, runner: ProcessRunner ): """Test that environment is inherited when env=None.""" # PATH should be inherited if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "echo %PATH%"], cwd=temp_workspace, env=None, # Inherit ) else: spec = ProcessSpec( argv=["sh", "-c", "echo $PATH"], cwd=temp_workspace, env=None, # Inherit ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode().strip() # PATH should be non-empty assert len(result) > 0 # ============================================================================= # ProcessSpec Tests # ============================================================================= class TestProcessSpec: """Test ProcessSpec dataclass.""" def test_frozen(self, temp_workspace: Path): """Test that ProcessSpec is immutable.""" spec = ProcessSpec( argv=["echo", "test"], cwd=temp_workspace, ) with pytest.raises(AttributeError): spec.argv = ["other"] # type: ignore def test_default_values(self, temp_workspace: Path): """Test default values.""" spec = ProcessSpec( argv=["echo"], cwd=temp_workspace, ) assert spec.env is None assert spec.stdin_bytes is None def test_with_all_fields(self, temp_workspace: Path): """Test creation with all fields.""" spec = ProcessSpec( argv=["my-cli", "--arg"], cwd=temp_workspace, env={"KEY": "value"}, stdin_bytes=b"input", ) assert spec.argv == ["my-cli", "--arg"] assert spec.cwd == temp_workspace assert spec.env == {"KEY": "value"} assert spec.stdin_bytes == b"input" # ============================================================================= # Edge Cases # ============================================================================= class TestEdgeCases: """Test edge cases and error handling.""" @pytest.mark.asyncio async def test_nonexistent_command( self, temp_workspace: Path, runner: ProcessRunner ): """Test handling of non-existent command.""" spec = ProcessSpec( argv=["nonexistent_command_xyz_123"], cwd=temp_workspace, ) with pytest.raises((FileNotFoundError, OSError)): async for _ in runner.run(spec): pass @pytest.mark.asyncio async def test_exit_code_nonzero( self, temp_workspace: Path, runner: ProcessRunner ): """Test process with non-zero exit code.""" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "exit 1"], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sh", "-c", "exit 1"], cwd=temp_workspace, ) # Should complete without exception async for _ in runner.run(spec): pass @pytest.mark.asyncio async def test_empty_output(self, temp_workspace: Path, runner: ProcessRunner): """Test process with no output.""" if IS_WINDOWS: spec = ProcessSpec( argv=["cmd", "/c", "rem"], # No output cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["true"], # No output cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) # Should complete successfully with no output assert len(output) == 0 or b"".join(output).strip() == b"" @pytest.mark.asyncio async def test_large_output(self, temp_workspace: Path, runner: ProcessRunner): """Test handling of large output.""" line_count = 1000 if IS_WINDOWS: # Create a bat file to generate output bat_file = temp_workspace / "gen.bat" bat_content = "@echo off\n" + "echo line\n" * line_count bat_file.write_text(bat_content) spec = ProcessSpec( argv=["cmd", "/c", str(bat_file)], cwd=temp_workspace, ) else: spec = ProcessSpec( argv=["sh", "-c", f"for i in $(seq 1 {line_count}); do echo line$i; done"], cwd=temp_workspace, ) output = [] async for chunk in runner.run(spec): output.append(chunk) result = b"".join(output).decode() lines = [l for l in result.strip().split("\n") if l] assert len(lines) >= line_count # ============================================================================= # Cancel Scope Tests # ============================================================================= class TestCancelScope: """Test anyio.CancelScope integration.""" @pytest.mark.asyncio @pytest.mark.timeout(10) async def test_cancel_scope_stops_iteration( self, temp_workspace: Path, runner: ProcessRunner ): """Test that cancel_scope stops the iteration.""" import anyio if IS_WINDOWS: # Windows: use ping which outputs lines periodically spec = ProcessSpec( argv=["ping", "-n", "100", "127.0.0.1"], cwd=temp_workspace, ) else: # POSIX: script that outputs multiple lines quickly spec = ProcessSpec( argv=["sh", "-c", "for i in 1 2 3 4 5 6 7 8 9 10; do echo line$i; done"], cwd=temp_workspace, ) cancel_scope = anyio.CancelScope() output_count = 0 async for _ in runner.run(spec, cancel_scope=cancel_scope): output_count += 1 if output_count >= 3: cancel_scope.cancel() # Should have stopped after ~3 iterations # The process produces 10 lines but we stop at 3 assert 3 <= output_count <= 10

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server