Skip to main content
Glama
test_process_pool.py•38.7 kB
""" Unit tests for Tiger Process Pool Manager. Tests the TigerProcessPool class that manages isolated worker processes for Tiger SDK single-account limitation. Tests cover: 1. Process lifecycle management (start, stop, restart) 2. Worker process isolation and communication 3. Process failure recovery and health monitoring 4. Task execution and load balancing 5. Resource management and scaling 6. Error handling and timeout management """ import asyncio import uuid from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch import pytest # Import the class under test from mcp_server.tiger_process_pool import ( ProcessInfo, ProcessStatus, TigerProcessPool, get_process_pool, ) class TestTigerProcessPool: """Test suite for TigerProcessPool.""" def setup_method(self): """Setup test method with clean process pool.""" # Reset global instance for clean testing import mcp_server.tiger_process_pool mcp_server.tiger_process_pool._process_pool = None @pytest.fixture def process_pool(self, mock_account_manager): """Create a TigerProcessPool instance for testing.""" with ( patch("mcp_server.tiger_process_pool.get_config") as mock_config, patch( "mcp_server.tiger_process_pool.get_account_manager", return_value=mock_account_manager, ), ): # Setup mock config mock_config.return_value = MagicMock() pool = TigerProcessPool( max_processes=2, process_timeout=30.0, heartbeat_interval=5.0, max_restarts=2, restart_cooldown=10.0, ) return pool @pytest.mark.asyncio async def test_process_pool_initialization(self, process_pool): """Test process pool initialization.""" assert process_pool.max_processes == 2 assert process_pool.process_timeout == 30.0 assert process_pool.heartbeat_interval == 5.0 assert process_pool.max_restarts == 2 assert process_pool.restart_cooldown == 10.0 # Check initial state assert len(process_pool.processes) == 0 assert len(process_pool.account_to_process) == 0 assert len(process_pool.process_pool) == 0 assert process_pool._monitoring_active is False assert process_pool._shutdown is False @pytest.mark.asyncio async def test_process_pool_start_stop(self, process_pool): """Test process pool start and stop operations.""" # Test start await process_pool.start() assert process_pool._monitoring_active is True assert process_pool._monitoring_task is not None # Test stop await process_pool.stop() assert process_pool._monitoring_active is False assert process_pool._shutdown is True @pytest.mark.asyncio async def test_create_process_success( self, process_pool, mock_account_data, mock_multiprocessing ): """Test successful process creation.""" with patch.object( process_pool.account_manager, "get_account_by_id" ) as mock_get_account: # Setup account mock account = mock_account_data.accounts[0] mock_get_account.return_value = account # Setup multiprocessing mocks mock_process = mock_multiprocessing["process"] mock_queue = mock_multiprocessing["queue"] # Mock ready signal from worker ready_message = { "type": "ready", "timestamp": datetime.utcnow().isoformat(), } mock_queue.empty.return_value = False mock_queue.get_nowait.return_value = ready_message # Execute process creation process_id = str(uuid.uuid4()) account_id = account.id result_process_id = await process_pool._create_process( process_id, account_id ) # Verify results assert result_process_id == process_id assert process_id in process_pool.processes assert account_id in process_pool.account_to_process assert process_pool.account_to_process[account_id] == process_id # Verify process info process_info = process_pool.processes[process_id] assert process_info.process_id == process_id assert process_info.account_id == account_id assert process_info.account_number == account.account_number assert process_info.status == ProcessStatus.READY assert process_info.pid == mock_process.pid # Verify process was started mock_process.start.assert_called_once() @pytest.mark.asyncio async def test_create_process_timeout( self, process_pool, mock_account_data, mock_multiprocessing ): """Test process creation timeout.""" with patch.object( process_pool.account_manager, "get_account_by_id" ) as mock_get_account: # Setup account mock account = mock_account_data.accounts[0] mock_get_account.return_value = account # Setup multiprocessing mocks - no ready signal mock_queue = mock_multiprocessing["queue"] mock_queue.empty.return_value = True # No ready message # Execute process creation process_id = str(uuid.uuid4()) account_id = account.id with pytest.raises( RuntimeError, match="Worker process failed to start within timeout" ): await process_pool._create_process(process_id, account_id) # Verify cleanup occurred assert process_id not in process_pool.processes assert account_id not in process_pool.account_to_process @pytest.mark.asyncio async def test_create_process_worker_died( self, process_pool, mock_account_data, mock_multiprocessing ): """Test process creation when worker dies during startup.""" with patch.object( process_pool.account_manager, "get_account_by_id" ) as mock_get_account: # Setup account mock account = mock_account_data.accounts[0] mock_get_account.return_value = account # Setup multiprocessing mocks - process dies mock_process = mock_multiprocessing["process"] mock_process.is_alive.return_value = False # Process died # Execute process creation process_id = str(uuid.uuid4()) account_id = account.id with pytest.raises( RuntimeError, match="Worker process died during startup" ): await process_pool._create_process(process_id, account_id) @pytest.mark.asyncio async def test_get_or_create_process_existing( self, process_pool, mock_account_data ): """Test getting existing process for account.""" # Setup existing process account = mock_account_data.accounts[0] account_id = account.id process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Execute result_process_id = await process_pool.get_or_create_process(account_id) # Verify existing process is returned assert result_process_id == process_id @pytest.mark.asyncio async def test_get_or_create_process_max_limit( self, process_pool, mock_account_data ): """Test process creation when max limit is reached.""" # Fill up process pool to max capacity for i in range(process_pool.max_processes): mock_account_data.accounts[0] process_id = f"process_{i}" process_info = ProcessInfo( process_id=process_id, account_id=f"account_{i}", account_number=f"DU{i:06d}", status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info # Try to create one more process new_account_id = str(uuid.uuid4()) with pytest.raises(RuntimeError, match="Maximum processes .* reached"): await process_pool.get_or_create_process(new_account_id) @pytest.mark.asyncio async def test_execute_task_success( self, process_pool, mock_account_data, mock_multiprocessing ): """Test successful task execution.""" # Setup existing process account = mock_account_data.accounts[0] account_id = account.id process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Setup queues task_queue = mock_multiprocessing["queue"] result_queue = mock_multiprocessing["queue"] process_pool.task_queues[process_id] = task_queue process_pool.result_queues[process_id] = result_queue # Mock task response task_response = { "task_id": "test_task_123", "success": True, "result": {"quote": {"symbol": "AAPL", "price": 150.25}}, "execution_time": 0.5, "timestamp": datetime.utcnow().isoformat(), } # Mock async queue operations async def mock_put_queue_async(queue, item, timeout=None): pass async def mock_get_queue_async(queue, timeout=None): return task_response process_pool._put_queue_async = mock_put_queue_async process_pool._get_queue_async = mock_get_queue_async # Execute task result = await process_pool.execute_task( account_id=account_id, method="get_quote", args=["AAPL"], kwargs={}, timeout=30.0, ) # Verify results assert result == task_response["result"] # Verify process status updated assert process_info.status == ProcessStatus.READY assert process_info.current_task is None @pytest.mark.asyncio async def test_execute_task_timeout(self, process_pool, mock_account_data): """Test task execution timeout.""" # Setup existing process account = mock_account_data.accounts[0] account_id = account.id process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Mock timeout in queue operations async def mock_put_queue_async(queue, item, timeout=None): pass async def mock_get_queue_async(queue, timeout=None): raise TimeoutError("Task execution timed out") process_pool._put_queue_async = mock_put_queue_async process_pool._get_queue_async = mock_get_queue_async # Mock restart process process_pool._restart_process = AsyncMock() # Execute task with pytest.raises(TimeoutError, match="Task execution timed out"): await process_pool.execute_task( account_id=account_id, method="get_quote", args=["AAPL"], timeout=5.0 ) # Verify process status and restart assert process_info.status == ProcessStatus.ERROR process_pool._restart_process.assert_called_once_with(process_id) @pytest.mark.asyncio async def test_execute_task_failure(self, process_pool, mock_account_data): """Test task execution failure.""" # Setup existing process account = mock_account_data.accounts[0] account_id = account.id process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number=account.account_number, status=ProcessStatus.READY, error_count=2, # Already has some errors ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Mock task failure response task_response = { "task_id": "test_task_123", "success": False, "result": None, "error": "API call failed", "execution_time": 0.5, "timestamp": datetime.utcnow().isoformat(), } # Mock async queue operations async def mock_put_queue_async(queue, item, timeout=None): pass async def mock_get_queue_async(queue, timeout=None): return task_response process_pool._put_queue_async = mock_put_queue_async process_pool._get_queue_async = mock_get_queue_async # Mock restart process (will be called due to error count) process_pool._restart_process = AsyncMock() # Execute task with pytest.raises( RuntimeError, match="Task execution failed: API call failed" ): await process_pool.execute_task( account_id=account_id, method="get_quote", args=["AAPL"] ) # Verify error count increased and restart was called assert process_info.error_count == 3 process_pool._restart_process.assert_called_once_with(process_id) @pytest.mark.asyncio async def test_restart_process( self, process_pool, mock_account_data, mock_multiprocessing ): """Test process restart operation.""" # Setup existing process account = mock_account_data.accounts[0] account_id = account.id process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number=account.account_number, status=ProcessStatus.ERROR, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Mock remove and create operations process_pool._remove_process = AsyncMock(return_value=True) process_pool._create_process = AsyncMock(return_value=process_id) # Execute restart result = await process_pool._restart_process(process_id) # Verify results assert result is True process_pool._remove_process.assert_called_once_with(process_id) process_pool._create_process.assert_called_once_with(process_id, account_id) @pytest.mark.asyncio async def test_remove_process_graceful(self, process_pool, mock_multiprocessing): """Test graceful process removal.""" # Setup existing process process_id = str(uuid.uuid4()) account_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number="DU123456", status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Setup mock process and queues mock_process = mock_multiprocessing["process"] mock_queue = mock_multiprocessing["queue"] process_pool.process_pool[process_id] = mock_process process_pool.task_queues[process_id] = mock_queue process_pool.result_queues[process_id] = mock_queue # Mock graceful shutdown mock_process.join.return_value = None # Process stops gracefully # Execute removal result = await process_pool._remove_process(process_id) # Verify results assert result is True assert process_id not in process_pool.processes assert account_id not in process_pool.account_to_process assert process_id not in process_pool.process_pool assert process_info.status == ProcessStatus.STOPPED # Verify graceful shutdown was attempted mock_queue.put_nowait.assert_called_once() mock_process.join.assert_called() @pytest.mark.asyncio async def test_remove_process_force_terminate( self, process_pool, mock_multiprocessing ): """Test forced process termination.""" # Setup existing process process_id = str(uuid.uuid4()) account_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account_id, account_number="DU123456", status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account_id] = process_id # Setup mock process that won't stop gracefully mock_process = mock_multiprocessing["process"] mock_process.is_alive.side_effect = [ True, True, False, ] # Alive after terminate, dead after kill mock_queue = mock_multiprocessing["queue"] process_pool.process_pool[process_id] = mock_process process_pool.task_queues[process_id] = mock_queue process_pool.result_queues[process_id] = mock_queue # Execute removal result = await process_pool._remove_process(process_id) # Verify results assert result is True # Verify forced termination was used mock_process.terminate.assert_called_once() mock_process.kill.assert_called_once() @pytest.mark.asyncio async def test_process_monitoring( self, process_pool, mock_account_data, mock_multiprocessing ): """Test process health monitoring.""" # Setup existing processes processes = [] for i, account in enumerate(mock_account_data.active_accounts): process_id = f"process_{i}" process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, last_heartbeat=datetime.utcnow() - timedelta(seconds=5), ) processes.append((process_id, process_info, account)) process_pool.processes[process_id] = process_info process_pool.account_to_process[account.id] = process_id # Setup mock process and queues mock_process = mock_multiprocessing["process"] mock_queue = mock_multiprocessing["queue"] process_pool.process_pool[process_id] = mock_process process_pool.task_queues[process_id] = mock_queue # Start monitoring process_pool._monitoring_active = True # Create monitoring task that runs once async def single_monitor_cycle(): current_time = datetime.utcnow() for process_id, process_info in list(process_pool.processes.items()): # Check if process is still alive process = process_pool.process_pool.get(process_id) if process and process.is_alive(): # Send heartbeat check if process_info.status == ProcessStatus.READY: task_queue = process_pool.task_queues.get(process_id) if task_queue: heartbeat_task = { "type": "heartbeat", "timestamp": current_time.isoformat(), } task_queue.put_nowait(heartbeat_task) # Run monitoring cycle await single_monitor_cycle() # Verify heartbeat messages were sent to ready processes for process_id, process_info, account in processes: if process_info.status == ProcessStatus.READY: task_queue = process_pool.task_queues[process_id] task_queue.put_nowait.assert_called() @pytest.mark.asyncio async def test_process_monitoring_dead_process( self, process_pool, mock_account_data, mock_multiprocessing ): """Test monitoring detects and restarts dead processes.""" # Setup existing process account = mock_account_data.accounts[0] process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info # Setup dead mock process mock_process = mock_multiprocessing["process"] mock_process.is_alive.return_value = False # Process is dead process_pool.process_pool[process_id] = mock_process # Mock restart process process_pool._restart_process = AsyncMock() # Simulate monitoring check datetime.utcnow() process = process_pool.process_pool.get(process_id) if process and not process.is_alive(): await process_pool._restart_process(process_id) # Verify restart was called process_pool._restart_process.assert_called_once_with(process_id) @pytest.mark.asyncio async def test_process_monitoring_heartbeat_timeout( self, process_pool, mock_account_data ): """Test monitoring detects heartbeat timeouts.""" # Setup existing process with old heartbeat account = mock_account_data.accounts[0] process_id = str(uuid.uuid4()) old_heartbeat = datetime.utcnow() - timedelta( seconds=process_pool.process_timeout + 10 ) process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, last_heartbeat=old_heartbeat, ) process_pool.processes[process_id] = process_info # Mock restart process process_pool._restart_process = AsyncMock() # Simulate heartbeat timeout check current_time = datetime.utcnow() if process_info.last_heartbeat: heartbeat_age = (current_time - process_info.last_heartbeat).total_seconds() if heartbeat_age > process_pool.process_timeout: await process_pool._restart_process(process_id) # Verify restart was called process_pool._restart_process.assert_called_once_with(process_id) @pytest.mark.asyncio async def test_get_process_status(self, process_pool, mock_account_data): """Test getting process status for account.""" # Setup existing process account = mock_account_data.accounts[0] process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account.id] = process_id # Execute result = await process_pool.get_process_status(account.id) # Verify assert result == process_info # Test non-existent account result = await process_pool.get_process_status("non_existent_account") assert result is None @pytest.mark.asyncio async def test_get_all_processes(self, process_pool, mock_account_data): """Test getting all process statuses.""" # Setup multiple processes processes = [] for i, account in enumerate(mock_account_data.active_accounts): process_id = f"process_{i}" process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) processes.append(process_info) process_pool.processes[process_id] = process_info # Execute result = await process_pool.get_all_processes() # Verify assert len(result) == len(processes) assert set(result) == set(processes) @pytest.mark.asyncio async def test_restart_process_by_account(self, process_pool, mock_account_data): """Test restarting process by account ID.""" # Setup existing process account = mock_account_data.accounts[0] process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account.id] = process_id # Mock restart process process_pool._restart_process = AsyncMock(return_value=True) # Execute result = await process_pool.restart_process(account.id) # Verify assert result is True process_pool._restart_process.assert_called_once_with(process_id) # Test non-existent account result = await process_pool.restart_process("non_existent_account") assert result is False @pytest.mark.asyncio async def test_remove_process_by_account(self, process_pool, mock_account_data): """Test removing process by account ID.""" # Setup existing process account = mock_account_data.accounts[0] process_id = str(uuid.uuid4()) process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) process_pool.processes[process_id] = process_info process_pool.account_to_process[account.id] = process_id # Mock remove process process_pool._remove_process = AsyncMock(return_value=True) # Execute result = await process_pool.remove_process(account.id) # Verify assert result is True process_pool._remove_process.assert_called_once_with(process_id) # Test non-existent account result = await process_pool.remove_process("non_existent_account") assert result is False @pytest.mark.asyncio async def test_stop_all_processes(self, process_pool, mock_account_data): """Test stopping all processes.""" # Setup multiple processes process_ids = [] for i, account in enumerate(mock_account_data.active_accounts): process_id = f"process_{i}" process_info = ProcessInfo( process_id=process_id, account_id=account.id, account_number=account.account_number, status=ProcessStatus.READY, ) process_ids.append(process_id) process_pool.processes[process_id] = process_info # Mock remove process process_pool._remove_process = AsyncMock(return_value=True) # Execute await process_pool._stop_all_processes() # Verify all processes were removed assert process_pool._remove_process.call_count == len(process_ids) for process_id in process_ids: process_pool._remove_process.assert_any_call(process_id) def test_get_process_pool_singleton(self): """Test global process pool singleton.""" # Get first instance pool1 = get_process_pool() # Get second instance pool2 = get_process_pool() # Should be the same instance assert pool1 is pool2 assert isinstance(pool1, TigerProcessPool) class TestProcessPoolIntegration: """Integration tests for TigerProcessPool.""" @pytest.mark.integration @pytest.mark.asyncio async def test_process_pool_full_lifecycle( self, mock_account_manager, mock_account_data, mock_multiprocessing ): """Test complete process pool lifecycle.""" with patch("mcp_server.tiger_process_pool.get_config") as mock_config: # Setup mock config mock_config.return_value = MagicMock() # Create process pool pool = TigerProcessPool(max_processes=2, heartbeat_interval=1.0) try: # Start pool await pool.start() assert pool._monitoring_active is True # Setup account account = mock_account_data.accounts[0] mock_account_manager.get_account_by_id.return_value = account # Mock process creation success mock_queue = mock_multiprocessing["queue"] ready_message = {"type": "ready"} mock_queue.empty.return_value = False mock_queue.get_nowait.return_value = ready_message # Create process for account process_id = await pool.get_or_create_process(account.id) assert process_id is not None assert account.id in pool.account_to_process # Mock task execution task_response = { "task_id": "test_task", "success": True, "result": {"data": "test_result"}, "execution_time": 0.1, "timestamp": datetime.utcnow().isoformat(), } async def mock_put_queue_async(queue, item, timeout=None): pass async def mock_get_queue_async(queue, timeout=None): return task_response pool._put_queue_async = mock_put_queue_async pool._get_queue_async = mock_get_queue_async # Execute task result = await pool.execute_task( account_id=account.id, method="test_method", args=["test_arg"] ) assert result == task_response["result"] # Get process status status = await pool.get_process_status(account.id) assert status is not None assert status.status == ProcessStatus.READY # Get all processes all_processes = await pool.get_all_processes() assert len(all_processes) == 1 # Restart process restart_result = await pool.restart_process(account.id) # Result depends on mocking, but should not raise exception assert isinstance(restart_result, bool) finally: # Stop pool await pool.stop() assert pool._shutdown is True @pytest.mark.integration @pytest.mark.slow @pytest.mark.asyncio async def test_process_pool_concurrent_operations( self, mock_account_manager, mock_account_data, mock_multiprocessing ): """Test concurrent operations on process pool.""" with patch("mcp_server.tiger_process_pool.get_config") as mock_config: # Setup mock config mock_config.return_value = MagicMock() # Create process pool pool = TigerProcessPool(max_processes=3) try: await pool.start() # Setup accounts accounts = mock_account_data.active_accounts mock_account_manager.get_account_by_id.side_effect = ( lambda account_id: next( (acc for acc in accounts if acc.id == str(account_id)), None ) ) # Mock successful process creation mock_queue = mock_multiprocessing["queue"] ready_message = {"type": "ready"} mock_queue.empty.return_value = False mock_queue.get_nowait.return_value = ready_message # Create processes concurrently create_tasks = [ pool.get_or_create_process(account.id) for account in accounts[:2] # Only use 2 accounts ] process_ids = await asyncio.gather(*create_tasks) assert len(process_ids) == 2 assert all(pid is not None for pid in process_ids) # Mock task execution task_response = { "task_id": "test_task", "success": True, "result": {"data": "concurrent_result"}, "execution_time": 0.1, "timestamp": datetime.utcnow().isoformat(), } async def mock_put_queue_async(queue, item, timeout=None): await asyncio.sleep(0.01) # Simulate some delay async def mock_get_queue_async(queue, timeout=None): await asyncio.sleep(0.02) # Simulate processing time return task_response pool._put_queue_async = mock_put_queue_async pool._get_queue_async = mock_get_queue_async # Execute tasks concurrently on different accounts task_execution_tasks = [ pool.execute_task( account_id=account.id, method=f"test_method_{i}", args=[f"test_arg_{i}"], ) for i, account in enumerate(accounts[:2]) ] start_time = asyncio.get_event_loop().time() results = await asyncio.gather(*task_execution_tasks) end_time = asyncio.get_event_loop().time() # Verify results assert len(results) == 2 assert all(result == task_response["result"] for result in results) # Verify concurrent execution (should be faster than sequential) execution_time = end_time - start_time assert ( execution_time < 0.1 ) # Much faster than 2 * (0.01 + 0.02) sequential # Get status of all processes status_tasks = [ pool.get_process_status(account.id) for account in accounts[:2] ] statuses = await asyncio.gather(*status_tasks) assert len(statuses) == 2 assert all(status is not None for status in statuses) assert all(status.status == ProcessStatus.READY for status in statuses) finally: await pool.stop() @pytest.mark.integration @pytest.mark.asyncio async def test_process_pool_error_recovery( self, mock_account_manager, mock_account_data, mock_multiprocessing ): """Test process pool error recovery mechanisms.""" with patch("mcp_server.tiger_process_pool.get_config") as mock_config: # Setup mock config mock_config.return_value = MagicMock() # Create process pool pool = TigerProcessPool(max_processes=2, max_restarts=1) try: await pool.start() # Setup account account = mock_account_data.accounts[0] mock_account_manager.get_account_by_id.return_value = account # Mock process creation success initially mock_queue = mock_multiprocessing["queue"] ready_message = {"type": "ready"} mock_queue.empty.return_value = False mock_queue.get_nowait.return_value = ready_message # Create process process_id = await pool.get_or_create_process(account.id) assert process_id is not None # Simulate process failure during task execution error_response = { "task_id": "failing_task", "success": False, "result": None, "error": "Simulated API failure", "execution_time": 0.1, "timestamp": datetime.utcnow().isoformat(), } async def mock_put_queue_async(queue, item, timeout=None): pass async def mock_get_queue_async(queue, timeout=None): return error_response pool._put_queue_async = mock_put_queue_async pool._get_queue_async = mock_get_queue_async # Mock restart operations pool._remove_process = AsyncMock(return_value=True) pool._create_process = AsyncMock(return_value=process_id) # Execute failing task multiple times to trigger restart process_info = pool.processes[process_id] original_error_count = process_info.error_count # This should fail and increment error count with pytest.raises(RuntimeError, match="Task execution failed"): await pool.execute_task( account_id=account.id, method="failing_method", args=["test"] ) # Verify error count increased assert process_info.error_count > original_error_count # Simulate multiple failures to trigger restart threshold process_info.error_count = 3 # Set to trigger restart with pytest.raises(RuntimeError): await pool.execute_task( account_id=account.id, method="failing_method", args=["test"] ) # Verify restart was attempted pool._restart_process.assert_called() finally: await pool.stop()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server