mcp-shell-server
by tumf
- mcp-shell-server
- tests
"""Tests for the ProcessManager class."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcp_shell_server.process_manager import ProcessManager
def create_mock_process():
"""Create a mock process with all required attributes."""
process = MagicMock()
process.returncode = 0
process.communicate = AsyncMock(return_value=(b"output", b"error"))
process.wait = AsyncMock(return_value=0)
process.terminate = MagicMock()
process.kill = MagicMock()
return process
@pytest.fixture
def process_manager():
"""Fixture for ProcessManager instance."""
return ProcessManager()
@pytest.mark.asyncio
async def test_create_process(process_manager):
"""Test creating a process with basic parameters."""
mock_proc = create_mock_process()
with patch(
"mcp_shell_server.process_manager.asyncio.create_subprocess_shell",
new_callable=AsyncMock,
return_value=mock_proc,
) as mock_create:
process = await process_manager.create_process(
"echo 'test'",
directory="/tmp",
stdin="input",
)
assert process == mock_proc
assert process == mock_proc
mock_create.assert_called_once()
@pytest.mark.asyncio
async def test_execute_with_timeout_success(process_manager):
"""Test executing a process with successful completion."""
mock_proc = create_mock_process()
mock_proc.returncode = 0
mock_proc.communicate.return_value = (b"output", b"error")
stdout, stderr = await process_manager.execute_with_timeout(
mock_proc,
stdin="input",
timeout=10,
)
assert stdout == b"output"
assert stderr == b"error"
mock_proc.communicate.assert_called_once()
@pytest.mark.asyncio
async def test_execute_with_timeout_timeout(process_manager):
"""Test executing a process that times out."""
mock_proc = create_mock_process()
exc = asyncio.TimeoutError("Process timed out")
mock_proc.communicate.side_effect = exc
mock_proc.returncode = None # プロセスがまだ実行中の状態をシミュレート
# プロセスの終了状態をシミュレート
async def set_returncode():
mock_proc.returncode = -15 # SIGTERM
mock_proc.wait.side_effect = set_returncode
with pytest.raises(TimeoutError):
await process_manager.execute_with_timeout(
mock_proc,
timeout=1,
)
mock_proc.terminate.assert_called_once()
@pytest.mark.asyncio
async def test_execute_pipeline_success(process_manager):
"""Test executing a pipeline of commands successfully."""
mock_proc1 = create_mock_process()
mock_proc1.communicate.return_value = (b"output1", b"")
mock_proc1.returncode = 0
mock_proc2 = create_mock_process()
mock_proc2.communicate.return_value = (b"final output", b"")
mock_proc2.returncode = 0
with patch(
"mcp_shell_server.process_manager.asyncio.create_subprocess_shell",
new_callable=AsyncMock,
side_effect=[mock_proc1, mock_proc2],
) as mock_create:
stdout, stderr, return_code = await process_manager.execute_pipeline(
["echo 'test'", "grep test"],
directory="/tmp",
timeout=10,
)
assert stdout == b"final output"
assert stderr == b""
assert return_code == 0
assert mock_create.call_count == 2 # Verify subprocess creation calls
# Verify the command arguments for each subprocess call
calls = mock_create.call_args_list
assert "echo" in calls[0].args[0]
assert "grep" in calls[1].args[0]
@pytest.mark.asyncio
async def test_execute_pipeline_with_error(process_manager):
"""Test executing a pipeline where a command fails."""
mock_proc = create_mock_process()
mock_proc.communicate.return_value = (b"", b"error message")
mock_proc.returncode = 1
create_process_mock = AsyncMock(return_value=mock_proc)
with patch.object(process_manager, "create_process", create_process_mock):
with pytest.raises(ValueError, match="error message"):
await process_manager.execute_pipeline(
["invalid_command"],
directory="/tmp",
)
@pytest.mark.asyncio
async def test_cleanup_processes(process_manager):
"""Test cleaning up processes."""
# Create mock processes with different states
running_proc = create_mock_process()
running_proc.returncode = None
completed_proc = create_mock_process()
completed_proc.returncode = 0
# Execute cleanup
await process_manager.cleanup_processes([running_proc, completed_proc])
# Verify running process was killed and waited for
running_proc.kill.assert_called_once()
running_proc.wait.assert_awaited_once()
# Verify completed process was not killed or waited for
completed_proc.kill.assert_not_called()
completed_proc.wait.assert_not_called()
@pytest.mark.asyncio
async def test_create_process_with_error(process_manager):
"""Test creating a process that fails to start."""
with patch(
"asyncio.create_subprocess_shell",
new_callable=AsyncMock,
side_effect=OSError("Failed to create process"),
):
with pytest.raises(ValueError, match="Failed to create process"):
await process_manager.create_process("invalid command", directory="/tmp")
@pytest.mark.asyncio
async def test_execute_pipeline_empty_commands(process_manager):
"""Test executing a pipeline with no commands."""
with pytest.raises(ValueError, match="No commands provided"):
await process_manager.execute_pipeline([], directory="/tmp")
@pytest.mark.asyncio
async def test_execute_pipeline_timeout(process_manager):
"""Test executing a pipeline that times out."""
mock_proc = create_mock_process()
mock_proc.communicate.side_effect = TimeoutError("Process timed out")
with patch.object(process_manager, "create_process", return_value=mock_proc):
with pytest.raises(TimeoutError, match="Process timed out"):
await process_manager.execute_pipeline(
["sleep 10"],
directory="/tmp",
timeout=1,
)
mock_proc.kill.assert_called_once()