"""
Tests for timeout handling in execute_code method.
"""
import queue
import time
from unittest.mock import Mock, patch
import pytest
from mcp_odoo_shell.shell_manager import OdooShellManager
class TestTimeoutHandling:
"""Test timeout behavior in execute_code method."""
def test_timeout_during_output_wait(self, shell_manager_args, fake_process):
"""Test timeout when waiting for output chunks."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock time to simulate timeout
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 5, start_time + 31]):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError, match="Command execution timed out after 30 seconds"):
manager.execute_code("test", timeout=30)
def test_timeout_during_queue_empty_exception(self, shell_manager_args, fake_process):
"""Test timeout in the queue.Empty exception handler."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock time to simulate timeout in exception handler
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 16]):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError, match="Command execution timed out after 15 seconds"):
manager.execute_code("test", timeout=15)
def test_timeout_custom_duration(self, shell_manager_args, fake_process):
"""Test timeout with custom duration."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock time to simulate timeout after 5 seconds
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 6]):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError, match="Command execution timed out after 5 seconds"):
manager.execute_code("test", timeout=5)
def test_no_timeout_when_prompt_received_quickly(self, shell_manager_args, fake_process):
"""Test that no timeout occurs when prompt is received quickly."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock time to show quick response
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 1, start_time + 2]):
with patch.object(manager.output_queue, 'get', side_effect=["output\n", ">>>"]):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("test", timeout=30)
assert result == "output\n>>>"
def test_timeout_with_partial_output(self, shell_manager_args, fake_process):
"""Test timeout behavior when some output was received."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock to receive some output, then timeout
start_time = 1000.0
output_chunks = ["partial output\n", queue.Empty()]
def mock_get(timeout=1):
chunk = output_chunks.pop(0)
if isinstance(chunk, Exception):
raise chunk
return chunk
with patch('time.time', side_effect=[start_time, start_time + 1, start_time + 11]):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError, match="Command execution timed out after 10 seconds"):
manager.execute_code("test", timeout=10)
def test_timeout_edge_case_exact_timing(self, shell_manager_args, fake_process):
"""Test timeout at exact boundary timing."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock time to hit exactly the timeout boundary - provide enough values
start_time = 1000.0
def time_side_effect():
# First call: start time, subsequent calls: past timeout
calls = getattr(time_side_effect, 'calls', 0)
time_side_effect.calls = calls + 1
if calls == 0:
return start_time
else:
return start_time + 10.1 # Just past timeout
with patch('time.time', side_effect=time_side_effect):
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError, match="Command execution timed out after 10 seconds"):
manager.execute_code("test", timeout=10)
def test_timeout_message_includes_duration(self, shell_manager_args, fake_process):
"""Test that timeout error message includes the timeout duration."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 46]):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError) as exc_info:
manager.execute_code("test", timeout=45)
assert "45 seconds" in str(exc_info.value)
class TestTimeoutWithRealQueue:
"""Test timeout behavior with real queue operations."""
def test_real_queue_timeout_behavior(self, shell_manager_args, fake_process):
"""Test timeout with actual queue timeout behavior."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Use real queue but mock time to control timeout
real_queue = queue.Queue()
manager.output_queue = real_queue
start_time = time.time()
# Mock time to advance rapidly
time_values = [start_time + i for i in range(0, 35, 5)] # 0, 5, 10, 15, 20, 25, 30
with patch('time.time', side_effect=time_values):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError):
manager.execute_code("test", timeout=25)
def test_queue_get_timeout_parameter(self, shell_manager_args, fake_process):
"""Test that queue.get is called with correct timeout parameter."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 2]):
with patch.object(manager.output_queue, 'get') as mock_get:
mock_get.side_effect = queue.Empty()
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError):
manager.execute_code("test", timeout=1)
# Verify queue.get was called with timeout=1
mock_get.assert_called_with(timeout=1)
class TestTimeoutErrorHandling:
"""Test how timeout errors are handled by the system."""
def test_timeout_preserves_queue_state(self, shell_manager_args, fake_process):
"""Test that timeout doesn't corrupt queue state."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Put something in queues before timeout
manager.input_queue.put("pre-existing")
manager.output_queue.put("pre-existing")
start_time = 1000.0
def time_side_effect():
calls = getattr(time_side_effect, 'calls', 0)
time_side_effect.calls = calls + 1
if calls == 0:
return start_time
else:
return start_time + 11 # Past timeout
# Mock queue clearing: first call gets pre-existing, then empty, then timeout in main loop
queue_calls = []
def mock_get(*args, **kwargs):
queue_calls.append(1)
if len(queue_calls) == 1:
return "pre-existing" # Clear existing item
else:
raise queue.Empty() # Then empty, causing timeout
def mock_empty():
return len(queue_calls) >= 1 # Empty after first get call
with patch('time.time', side_effect=time_side_effect):
with patch.object(manager.output_queue, 'empty', side_effect=mock_empty):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with pytest.raises(TimeoutError):
manager.execute_code("test", timeout=10)
# Queues should still be accessible after timeout
assert not manager.input_queue.empty()
assert manager.output_queue is not None
def test_timeout_doesnt_affect_process_state(self, shell_manager_args, fake_process):
"""Test that timeout doesn't change process state."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
start_time = 1000.0
with patch('time.time', side_effect=[start_time, start_time + 11]):
with patch.object(manager.output_queue, 'get', side_effect=queue.Empty()):
with patch.object(manager.input_queue, 'put'):
with pytest.raises(TimeoutError):
manager.execute_code("test", timeout=10)
# Process should still be the same
assert manager.process is fake_process
assert fake_process.is_running # Still running