Skip to main content
Glama
seletz
by seletz
test_execute_code.py12 kB
""" Tests for execute_code functionality with mocked shell processes. """ import queue import time from unittest.mock import Mock, patch import pytest from mcp_odoo_shell.shell_manager import OdooShellManager class TestExecuteCodeFlow: """Test the execute_code method with various scenarios.""" def test_execute_simple_code(self, shell_manager_args, fake_process): """Test executing simple code that returns immediately.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Mock coordinated queue behavior: simulate multi-chunk response with prompt detection chunks = ["4\n", ">>>"] chunk_index = [0] # Use list to allow modification in closure def mock_get(*args, **kwargs): if chunk_index[0] < len(chunks): result = chunks[chunk_index[0]] chunk_index[0] += 1 return result else: raise queue.Empty() with patch.object(manager.output_queue, 'empty', return_value=True): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put') as mock_put: result = manager.execute_code("2+2") mock_put.assert_called_once_with("2+2") assert result == "4\n>>>" def test_execute_code_with_multiline_output(self, shell_manager_args, fake_process): """Test executing code that produces multiple lines of output.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Simulate multiline output output_chunks = ["line 1\n", "line 2\n", ">>>\n"] with patch.object(manager.output_queue, 'get', side_effect=output_chunks): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("some_code") assert result == "line 1\nline 2\n>>>" def test_execute_code_ipython_prompt(self, shell_manager_args, fake_process): """Test code execution with IPython-style prompt.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Simulate IPython prompt output_chunks = ["result\n", "In [2]: "] with patch.object(manager.output_queue, 'get', side_effect=output_chunks): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("some_code") assert result == "result\nIn [2]:" def test_execute_code_restarts_dead_process(self, shell_manager_args, fake_process): """Test that execute_code restarts the shell if process is dead.""" manager = OdooShellManager(**shell_manager_args) # Simulate dead process dead_process = Mock() dead_process.poll.return_value = 1 # Process is dead manager.process = dead_process with patch.object(manager, 'start_shell') as mock_start: with patch.object(manager.output_queue, 'get', return_value=">>>\n"): with patch.object(manager.input_queue, 'put'): manager.execute_code("test") mock_start.assert_called_once() def test_execute_code_no_process_starts_shell(self, shell_manager_args): """Test that execute_code starts shell if no process exists.""" manager = OdooShellManager(**shell_manager_args) # No process set initially with patch.object(manager, 'start_shell') as mock_start: with patch.object(manager.output_queue, 'get', return_value=">>>\n"): with patch.object(manager.input_queue, 'put'): manager.execute_code("test") mock_start.assert_called_once() def test_execute_code_clears_output_queue(self, shell_manager_args, fake_process): """Test that execute_code clears old output before execution.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Simulate queue state during clearing and execution queue_calls = [] def mock_get(*args, **kwargs): queue_calls.append('get') if len(queue_calls) == 1: return "old output 1" # First clearing call elif len(queue_calls) == 2: return "old output 2" # Second clearing call elif len(queue_calls) == 3: return "new output\n" # First chunk of new result elif len(queue_calls) == 4: return ">>>" # Prompt that ends execution else: raise queue.Empty() def mock_empty(): # Empty after 2 clearing calls return len(queue_calls) >= 2 with patch.object(manager.output_queue, 'empty', side_effect=mock_empty): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("test") # Should have cleared old output and gotten new output assert result == "new output\n>>>" assert len(queue_calls) >= 4 # 2 for clearing + 2 for new output class TestPromptDetection: """Test prompt detection logic in execute_code.""" def test_detects_python_prompt(self, shell_manager_args, fake_process): """Test detection of standard Python >>> prompt.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Simulate output with >>> prompt in separate chunk output_chunks = ["some output\n", ">>>"] with patch.object(manager.output_queue, 'get', side_effect=output_chunks): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("test") assert ">>>" in result def test_detects_ipython_prompt(self, shell_manager_args, fake_process): """Test detection of IPython In [n]: prompt.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Simulate output with IPython prompt output_chunks = ["result\n", "In [5]: "] with patch.object(manager.output_queue, 'get', side_effect=output_chunks): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("test") assert "In [" in result def test_requires_multiple_chunks_for_completion(self, shell_manager_args, fake_process): """Test that prompt detection requires more than one output chunk.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Single chunk with prompt should NOT break (needs len(output_lines) > 1) # First call returns a prompt, but it's the only chunk, so it should continue # Then subsequent calls timeout chunk_calls = [0] def mock_get(*args, **kwargs): chunk_calls[0] += 1 if chunk_calls[0] == 1: return ">>>" # First chunk with prompt, but only one chunk so far else: raise queue.Empty() # Then empty, should timeout def time_side_effect(): calls = getattr(time_side_effect, 'calls', 0) time_side_effect.calls = calls + 1 if calls == 0: return 0.0 # Start time else: return 999.0 # Past timeout with patch.object(manager.output_queue, 'empty', return_value=True): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put'): with patch('time.time', side_effect=time_side_effect): with pytest.raises(TimeoutError): manager.execute_code("test", timeout=1) class TestExecuteCodeEdgeCases: """Test edge cases in execute_code.""" def test_execute_empty_code(self, shell_manager_args, fake_process): """Test executing empty code.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Multi-chunk response for prompt detection chunks = ["", ">>>"] chunk_index = [0] def mock_get(*args, **kwargs): if chunk_index[0] < len(chunks): result = chunks[chunk_index[0]] chunk_index[0] += 1 return result else: raise queue.Empty() with patch.object(manager.output_queue, 'empty', return_value=True): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put') as mock_put: result = manager.execute_code("") mock_put.assert_called_once_with("") assert result == ">>>" def test_execute_code_strips_result(self, shell_manager_args, fake_process): """Test that result is stripped of leading/trailing whitespace.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Multi-chunk response chunks = [" \nresult\n \n", ">>> \n"] chunk_index = [0] def mock_get(*args, **kwargs): if chunk_index[0] < len(chunks): result = chunks[chunk_index[0]] chunk_index[0] += 1 return result else: raise queue.Empty() with patch.object(manager.output_queue, 'empty', return_value=True): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put'): result = manager.execute_code("test") assert result == "result\n \n>>>" # Leading/trailing stripped only def test_execute_code_with_queue_exception(self, shell_manager_args, fake_process): """Test handling of queue exceptions during execution.""" manager = OdooShellManager(**shell_manager_args) manager.process = fake_process # Mock queue.get to raise exception then return multi-chunk result chunks = ["result\n", ">>>"] chunk_index = [0] def mock_get(*args, **kwargs): if chunk_index[0] == 0: chunk_index[0] += 1 raise queue.Empty() # First call raises exception elif chunk_index[0] < len(chunks) + 1: result = chunks[chunk_index[0] - 1] chunk_index[0] += 1 return result else: raise queue.Empty() # Mock time to not timeout during the test def time_side_effect(): return 1000.0 # Constant time, no timeout with patch.object(manager.output_queue, 'empty', return_value=True): with patch.object(manager.output_queue, 'get', side_effect=mock_get): with patch.object(manager.input_queue, 'put'): with patch('time.time', side_effect=time_side_effect): result = manager.execute_code("test") assert result == "result\n>>>"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seletz/mcp-odoo-shell'

If you have feedback or need assistance with the MCP directory API, please join our Discord server