"""
Tests for execute_code functionality with mocked shell processes.
"""
import queue
import time
from unittest.mock import Mock, patch
import pytest
from mcp_odoo_shell.shell_manager import OdooShellManager
class TestExecuteCodeFlow:
"""Test the execute_code method with various scenarios."""
def test_execute_simple_code(self, shell_manager_args, fake_process):
"""Test executing simple code that returns immediately."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock coordinated queue behavior: simulate multi-chunk response with prompt detection
chunks = ["4\n", ">>>"]
chunk_index = [0] # Use list to allow modification in closure
def mock_get(*args, **kwargs):
if chunk_index[0] < len(chunks):
result = chunks[chunk_index[0]]
chunk_index[0] += 1
return result
else:
raise queue.Empty()
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put') as mock_put:
result = manager.execute_code("2+2")
mock_put.assert_called_once_with("2+2")
assert result == "4\n>>>"
def test_execute_code_with_multiline_output(self, shell_manager_args, fake_process):
"""Test executing code that produces multiple lines of output."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Simulate multiline output
output_chunks = ["line 1\n", "line 2\n", ">>>\n"]
with patch.object(manager.output_queue, 'get', side_effect=output_chunks):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("some_code")
assert result == "line 1\nline 2\n>>>"
def test_execute_code_ipython_prompt(self, shell_manager_args, fake_process):
"""Test code execution with IPython-style prompt."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Simulate IPython prompt
output_chunks = ["result\n", "In [2]: "]
with patch.object(manager.output_queue, 'get', side_effect=output_chunks):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("some_code")
assert result == "result\nIn [2]:"
def test_execute_code_restarts_dead_process(self, shell_manager_args, fake_process):
"""Test that execute_code restarts the shell if process is dead."""
manager = OdooShellManager(**shell_manager_args)
# Simulate dead process
dead_process = Mock()
dead_process.poll.return_value = 1 # Process is dead
manager.process = dead_process
with patch.object(manager, 'start_shell') as mock_start:
with patch.object(manager.output_queue, 'get', return_value=">>>\n"):
with patch.object(manager.input_queue, 'put'):
manager.execute_code("test")
mock_start.assert_called_once()
def test_execute_code_no_process_starts_shell(self, shell_manager_args):
"""Test that execute_code starts shell if no process exists."""
manager = OdooShellManager(**shell_manager_args)
# No process set initially
with patch.object(manager, 'start_shell') as mock_start:
with patch.object(manager.output_queue, 'get', return_value=">>>\n"):
with patch.object(manager.input_queue, 'put'):
manager.execute_code("test")
mock_start.assert_called_once()
def test_execute_code_clears_output_queue(self, shell_manager_args, fake_process):
"""Test that execute_code clears old output before execution."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Simulate queue state during clearing and execution
queue_calls = []
def mock_get(*args, **kwargs):
queue_calls.append('get')
if len(queue_calls) == 1:
return "old output 1" # First clearing call
elif len(queue_calls) == 2:
return "old output 2" # Second clearing call
elif len(queue_calls) == 3:
return "new output\n" # First chunk of new result
elif len(queue_calls) == 4:
return ">>>" # Prompt that ends execution
else:
raise queue.Empty()
def mock_empty():
# Empty after 2 clearing calls
return len(queue_calls) >= 2
with patch.object(manager.output_queue, 'empty', side_effect=mock_empty):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("test")
# Should have cleared old output and gotten new output
assert result == "new output\n>>>"
assert len(queue_calls) >= 4 # 2 for clearing + 2 for new output
class TestPromptDetection:
"""Test prompt detection logic in execute_code."""
def test_detects_python_prompt(self, shell_manager_args, fake_process):
"""Test detection of standard Python >>> prompt."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Simulate output with >>> prompt in separate chunk
output_chunks = ["some output\n", ">>>"]
with patch.object(manager.output_queue, 'get', side_effect=output_chunks):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("test")
assert ">>>" in result
def test_detects_ipython_prompt(self, shell_manager_args, fake_process):
"""Test detection of IPython In [n]: prompt."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Simulate output with IPython prompt
output_chunks = ["result\n", "In [5]: "]
with patch.object(manager.output_queue, 'get', side_effect=output_chunks):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("test")
assert "In [" in result
def test_requires_multiple_chunks_for_completion(self, shell_manager_args, fake_process):
"""Test that prompt detection requires more than one output chunk."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Single chunk with prompt should NOT break (needs len(output_lines) > 1)
# First call returns a prompt, but it's the only chunk, so it should continue
# Then subsequent calls timeout
chunk_calls = [0]
def mock_get(*args, **kwargs):
chunk_calls[0] += 1
if chunk_calls[0] == 1:
return ">>>" # First chunk with prompt, but only one chunk so far
else:
raise queue.Empty() # Then empty, should timeout
def time_side_effect():
calls = getattr(time_side_effect, 'calls', 0)
time_side_effect.calls = calls + 1
if calls == 0:
return 0.0 # Start time
else:
return 999.0 # Past timeout
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put'):
with patch('time.time', side_effect=time_side_effect):
with pytest.raises(TimeoutError):
manager.execute_code("test", timeout=1)
class TestExecuteCodeEdgeCases:
"""Test edge cases in execute_code."""
def test_execute_empty_code(self, shell_manager_args, fake_process):
"""Test executing empty code."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Multi-chunk response for prompt detection
chunks = ["", ">>>"]
chunk_index = [0]
def mock_get(*args, **kwargs):
if chunk_index[0] < len(chunks):
result = chunks[chunk_index[0]]
chunk_index[0] += 1
return result
else:
raise queue.Empty()
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put') as mock_put:
result = manager.execute_code("")
mock_put.assert_called_once_with("")
assert result == ">>>"
def test_execute_code_strips_result(self, shell_manager_args, fake_process):
"""Test that result is stripped of leading/trailing whitespace."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Multi-chunk response
chunks = [" \nresult\n \n", ">>> \n"]
chunk_index = [0]
def mock_get(*args, **kwargs):
if chunk_index[0] < len(chunks):
result = chunks[chunk_index[0]]
chunk_index[0] += 1
return result
else:
raise queue.Empty()
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put'):
result = manager.execute_code("test")
assert result == "result\n \n>>>" # Leading/trailing stripped only
def test_execute_code_with_queue_exception(self, shell_manager_args, fake_process):
"""Test handling of queue exceptions during execution."""
manager = OdooShellManager(**shell_manager_args)
manager.process = fake_process
# Mock queue.get to raise exception then return multi-chunk result
chunks = ["result\n", ">>>"]
chunk_index = [0]
def mock_get(*args, **kwargs):
if chunk_index[0] == 0:
chunk_index[0] += 1
raise queue.Empty() # First call raises exception
elif chunk_index[0] < len(chunks) + 1:
result = chunks[chunk_index[0] - 1]
chunk_index[0] += 1
return result
else:
raise queue.Empty()
# Mock time to not timeout during the test
def time_side_effect():
return 1000.0 # Constant time, no timeout
with patch.object(manager.output_queue, 'empty', return_value=True):
with patch.object(manager.output_queue, 'get', side_effect=mock_get):
with patch.object(manager.input_queue, 'put'):
with patch('time.time', side_effect=time_side_effect):
result = manager.execute_code("test")
assert result == "result\n>>>"