Skip to main content
Glama
test_server.py•36.2 kB
""" Unit tests for Tiger MCP Server. Tests the TigerMCPServer class that orchestrates all components including: 1. Server initialization and lifecycle management 2. Service orchestration and dependency management 3. Background task management and monitoring 4. Configuration loading and validation 5. Health status reporting and monitoring 6. Graceful shutdown and resource cleanup """ import asyncio import signal import sys from unittest.mock import AsyncMock, MagicMock, patch import pytest # Import the class under test from mcp_server.server import TigerMCPServer class TestTigerMCPServer: """Test suite for TigerMCPServer.""" @pytest.mark.asyncio async def test_server_initialization(self, mock_config): """Test server initialization with default parameters.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager # Create server server = TigerMCPServer() # Verify initial state assert server.config_manager is not None assert server.config is None assert server.account_manager is None assert server.account_router is None assert server.process_manager is None assert server.tiger_service is None assert len(server.background_tasks) == 0 assert not server._started assert not server.shutdown_event.is_set() @pytest.mark.asyncio async def test_server_initialization_with_config(self, mock_config): """Test server initialization with custom config file.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager # Create server with custom config server = TigerMCPServer( config_file="/path/to/custom/config.yaml", environment="testing" ) # Verify config manager was called with parameters mock_get_config_manager.assert_called_once_with( config_file="/path/to/custom/config.yaml", environment="testing" ) @pytest.mark.asyncio async def test_server_initialize_success( self, mock_config, mock_db_manager, mock_account_manager, mock_account_router, mock_process_manager, mock_tiger_service, ): """Test successful server initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager # Create server server = TigerMCPServer() # Mock all initialization steps with ( patch.object(server, "_configure_logging") as mock_configure_logging, patch.object(server, "_initialize_database") as mock_init_db, patch.object( server, "_initialize_account_services" ) as mock_init_accounts, patch.object(server, "_initialize_process_pool") as mock_init_process, patch.object(server, "_initialize_tiger_service") as mock_init_tiger, patch.object(server, "_start_background_tasks") as mock_start_tasks, ): # Execute initialization await server.initialize() # Verify initialization order and calls mock_config_manager.load_config.assert_called_once() mock_configure_logging.assert_called_once() mock_init_db.assert_called_once() mock_init_accounts.assert_called_once() mock_init_process.assert_called_once() mock_init_tiger.assert_called_once() mock_start_tasks.assert_called_once() # Verify final state assert server.config == mock_config assert server._started is True @pytest.mark.asyncio async def test_server_initialize_failure(self, mock_config): """Test server initialization failure and cleanup.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager # Create server server = TigerMCPServer() # Mock failure during process pool initialization with ( patch.object(server, "_configure_logging"), patch.object(server, "_initialize_database"), patch.object(server, "_initialize_account_services"), patch.object(server, "_initialize_process_pool") as mock_init_process, patch.object(server, "cleanup") as mock_cleanup, ): mock_init_process.side_effect = RuntimeError( "Process pool initialization failed" ) # Execute initialization and expect failure with pytest.raises( RuntimeError, match="Process pool initialization failed" ): await server.initialize() # Verify cleanup was called mock_cleanup.assert_called_once() assert server._started is False @pytest.mark.asyncio async def test_configure_logging(self, mock_config): """Test logging configuration.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config with patch("loguru.logger") as mock_logger: # Execute logging configuration server._configure_logging() # Verify logger configuration mock_logger.remove.assert_called_once() assert mock_logger.add.call_count >= 1 # At least console logging # Check if production logging is configured if mock_config.environment == "production": assert mock_logger.add.call_count == 2 # Console + file else: assert mock_logger.add.call_count == 1 # Console only @pytest.mark.asyncio async def test_initialize_database_success(self, mock_config, mock_db_manager): """Test successful database initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Execute database initialization await server._initialize_database() # Verify database manager initialization mock_db_manager.initialize.assert_called_once_with( database_url=mock_config.database.url, echo=mock_config.database.echo ) @pytest.mark.asyncio async def test_initialize_database_import_error(self, mock_config, capture_logs): """Test database initialization when database package is not available.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Mock import error with patch( "mcp_server.server.get_db_manager", side_effect=ImportError("Database package not available"), ): # Should not raise exception, just log warning await server._initialize_database() # Check that warning was logged log_messages = [record.message for record in capture_logs] assert any( "Database package not available" in msg for msg in log_messages ) @pytest.mark.asyncio async def test_initialize_account_services( self, mock_config, mock_account_manager, mock_account_router ): """Test account services initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Execute account services initialization await server._initialize_account_services() # Verify services were initialized assert server.account_manager is not None assert server.account_router is not None mock_account_manager.initialize.assert_called_once() @pytest.mark.asyncio async def test_initialize_process_pool(self, mock_config, mock_process_manager): """Test process pool initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Execute process pool initialization await server._initialize_process_pool() # Verify process manager configuration and start assert server.process_manager is not None mock_process_manager.configure.assert_called_once() mock_process_manager.start.assert_called_once() # Verify configuration parameters config_call = mock_process_manager.configure.call_args assert config_call[1]["min_workers"] == mock_config.process.min_workers assert config_call[1]["max_workers"] == mock_config.process.max_workers @pytest.mark.asyncio async def test_initialize_tiger_service(self, mock_config, mock_tiger_service): """Test Tiger API service initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Execute Tiger service initialization await server._initialize_tiger_service() # Verify service was started assert server.tiger_service is not None mock_tiger_service.start.assert_called_once() @pytest.mark.asyncio async def test_start_background_tasks( self, mock_config, mock_account_manager, mock_process_manager ): """Test background task startup.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.account_manager = mock_account_manager server.process_manager = mock_process_manager # Mock asyncio.create_task mock_tasks = [MagicMock() for _ in range(3)] with patch("asyncio.create_task", side_effect=mock_tasks): # Execute background task startup await server._start_background_tasks() # Verify tasks were created and tracked assert len(server.background_tasks) == 3 assert all(task in server.background_tasks for task in mock_tasks) @pytest.mark.asyncio async def test_token_refresh_task(self, mock_config, mock_account_manager): """Test token refresh background task.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.account_manager = mock_account_manager # Mock shutdown event for quick termination server.shutdown_event.set() # Execute token refresh task await server._token_refresh_task() # Task should complete quickly due to shutdown event # Verify account manager method would be called if not shutdown assert mock_account_manager.refresh_expiring_tokens.call_count >= 0 @pytest.mark.asyncio async def test_token_refresh_task_error_handling( self, mock_config, mock_account_manager, capture_logs ): """Test token refresh task error handling.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.account_manager = mock_account_manager # Mock account manager to raise error mock_account_manager.refresh_expiring_tokens.side_effect = RuntimeError( "Token refresh failed" ) # Set shutdown event to terminate task after one iteration async def delayed_shutdown(): await asyncio.sleep(0.1) server.shutdown_event.set() # Run task and shutdown concurrently await asyncio.gather( server._token_refresh_task(), delayed_shutdown(), return_exceptions=True ) # Verify error was logged log_messages = [record.message for record in capture_logs] assert any("Error in token refresh task" in msg for msg in log_messages) @pytest.mark.asyncio async def test_health_monitor_task(self, mock_config, mock_process_manager): """Test health monitoring background task.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.process_manager = mock_process_manager # Mock shutdown event for quick termination server.shutdown_event.set() # Execute health monitor task await server._health_monitor_task() # Task should complete quickly due to shutdown event # Verify process manager methods would be called if not shutdown assert mock_process_manager.check_worker_health.call_count >= 0 assert mock_process_manager.auto_scale.call_count >= 0 @pytest.mark.asyncio async def test_health_monitor_task_with_unhealthy_workers( self, mock_config, mock_process_manager, capture_logs ): """Test health monitoring with unhealthy workers.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.process_manager = mock_process_manager # Mock unhealthy workers mock_process_manager.check_worker_health.return_value = [ "worker1", "worker2", ] # Set shutdown event to terminate task after one iteration async def delayed_shutdown(): await asyncio.sleep(0.1) server.shutdown_event.set() # Run task and shutdown concurrently await asyncio.gather( server._health_monitor_task(), delayed_shutdown(), return_exceptions=True, ) # Verify warning was logged log_messages = [record.message for record in capture_logs] assert any("unhealthy workers" in msg for msg in log_messages) @pytest.mark.asyncio async def test_cleanup_task( self, mock_config, mock_account_manager, mock_process_manager ): """Test cleanup background task.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.account_manager = mock_account_manager server.process_manager = mock_process_manager # Mock shutdown event for quick termination server.shutdown_event.set() # Execute cleanup task await server._cleanup_task() # Task should complete quickly due to shutdown event # Verify cleanup methods would be called if not shutdown assert mock_account_manager.cleanup_expired_sessions.call_count >= 0 assert mock_process_manager.cleanup_old_metrics.call_count >= 0 @pytest.mark.asyncio async def test_wait_for_shutdown(self, mock_config): """Test waiting for shutdown signal.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() # Set shutdown event in parallel async def trigger_shutdown(): await asyncio.sleep(0.1) server.shutdown_event.set() # Wait for shutdown await asyncio.gather(server.wait_for_shutdown(), trigger_shutdown()) # Should complete when shutdown event is set assert server.shutdown_event.is_set() @pytest.mark.asyncio async def test_cleanup_success( self, mock_config, mock_tiger_service, mock_process_manager, mock_account_manager, mock_db_manager, ): """Test successful server cleanup.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.tiger_service = mock_tiger_service server.process_manager = mock_process_manager server.account_manager = mock_account_manager server._started = True # Add mock background tasks mock_task1 = AsyncMock() mock_task2 = AsyncMock() server.background_tasks = [mock_task1, mock_task2] # Execute cleanup await server.cleanup() # Verify cleanup order and calls mock_task1.cancel.assert_called_once() mock_task2.cancel.assert_called_once() mock_tiger_service.stop.assert_called_once() mock_process_manager.stop.assert_called_once() mock_account_manager.cleanup.assert_called_once() mock_db_manager.cleanup.assert_called_once() # Verify final state assert server._started is False assert len(server.background_tasks) == 0 @pytest.mark.asyncio async def test_cleanup_with_task_timeout(self, mock_config, capture_logs): """Test cleanup with background task timeout.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config # Create mock task that doesn't complete mock_task = AsyncMock() server.background_tasks = [mock_task] # Mock asyncio.wait_for to raise timeout with patch( "asyncio.wait_for", side_effect=asyncio.TimeoutError("Tasks did not complete"), ): await server.cleanup() # Verify warning was logged log_messages = [record.message for record in capture_logs] assert any("did not complete within timeout" in msg for msg in log_messages) @pytest.mark.asyncio async def test_cleanup_with_service_errors( self, mock_config, mock_tiger_service, capture_logs ): """Test cleanup with service errors.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.tiger_service = mock_tiger_service # Mock service to raise error during stop mock_tiger_service.stop.side_effect = RuntimeError("Service stop failed") # Execute cleanup - should not raise exception await server.cleanup() # Verify error was logged log_messages = [record.message for record in capture_logs] assert any( "Error stopping Tiger API service" in msg for msg in log_messages ) @pytest.mark.asyncio async def test_start_server(self, mock_config): """Test server start operation.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() # Mock initialize method server.initialize = AsyncMock() # Start server await server.start() # Verify initialization was called server.initialize.assert_called_once() # Try to start again - should raise error with pytest.raises(RuntimeError, match="Server is already started"): await server.start() @pytest.mark.asyncio async def test_stop_server(self, mock_config): """Test server stop operation.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server._started = True # Mock cleanup method server.cleanup = AsyncMock() # Stop server await server.stop() # Verify shutdown event was set and cleanup called assert server.shutdown_event.is_set() server.cleanup.assert_called_once() # Try to stop when not started - should raise error server._started = False with pytest.raises(RuntimeError, match="Server is not started"): await server.stop() def test_is_started_property(self, mock_config): """Test is_started property.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() # Initially not started assert server.is_started is False # Set started server._started = True assert server.is_started is True # Set not started server._started = False assert server.is_started is False def test_get_health_status( self, mock_config, mock_process_manager, mock_account_manager ): """Test health status reporting.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.process_manager = mock_process_manager server.account_manager = mock_account_manager server._started = True server.background_tasks = [MagicMock(), MagicMock()] # Mock process manager workers mock_worker1 = MagicMock() mock_worker1.request_count = 100 mock_worker1.error_count = 5 mock_worker2 = MagicMock() mock_worker2.request_count = 75 mock_worker2.error_count = 2 mock_process_manager.workers = { "worker1": mock_worker1, "worker2": mock_worker2, } # Mock account manager accounts mock_account1 = MagicMock() mock_account1.status.value = "active" mock_account2 = MagicMock() mock_account2.status.value = "inactive" mock_account_manager.accounts = { "account1": mock_account1, "account2": mock_account2, } # Get health status status = server.get_health_status() # Verify status structure assert "server" in status assert "process_pool" in status assert "accounts" in status # Verify server status server_status = status["server"] assert server_status["started"] is True assert server_status["environment"] == mock_config.environment assert server_status["background_tasks"] == 2 # Verify process pool status process_status = status["process_pool"] assert process_status["active_workers"] == 2 assert process_status["total_requests"] == 175 # 100 + 75 assert process_status["failed_requests"] == 7 # 5 + 2 # Verify account status account_status = status["accounts"] assert account_status["total_accounts"] == 2 assert account_status["active_accounts"] == 1 def test_get_health_status_with_errors( self, mock_config, mock_process_manager, mock_account_manager ): """Test health status reporting with service errors.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.process_manager = mock_process_manager server.account_manager = mock_account_manager # Mock process manager to raise error mock_process_manager.workers.side_effect = RuntimeError( "Process manager error" ) # Mock account manager to raise error mock_account_manager.accounts.side_effect = RuntimeError( "Account manager error" ) # Get health status status = server.get_health_status() # Verify error handling assert "process_pool" in status assert "error" in status["process_pool"] assert "accounts" in status assert "error" in status["accounts"] def test_signal_handler_setup(self, mock_config): """Test signal handler setup.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager with patch("signal.signal") as mock_signal: # Create server (should setup signal handlers) TigerMCPServer() # Verify signal handlers were setup (except on Windows) if sys.platform != "win32": assert mock_signal.call_count >= 1 # Verify SIGTERM and SIGINT handlers signal_calls = [call[0] for call in mock_signal.call_args_list] assert signal.SIGTERM in signal_calls assert signal.SIGINT in signal_calls def test_signal_handler_execution(self, mock_config): """Test signal handler execution.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() # Verify shutdown event is not set initially assert not server.shutdown_event.is_set() # Execute signal handler server._signal_handler(signal.SIGTERM, None) # Verify shutdown event is set assert server.shutdown_event.is_set() class TestTigerMCPServerIntegration: """Integration tests for TigerMCPServer.""" @pytest.mark.integration @pytest.mark.asyncio async def test_server_full_lifecycle( self, mock_config, mock_db_manager, mock_account_manager, mock_account_router, mock_process_manager, mock_tiger_service, ): """Test complete server lifecycle.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager # Create server server = TigerMCPServer() try: # Start server await server.start() # Verify server is started assert server.is_started is True assert server.config == mock_config # Verify all services were initialized mock_db_manager.initialize.assert_called_once() mock_account_manager.initialize.assert_called_once() mock_process_manager.configure.assert_called_once() mock_process_manager.start.assert_called_once() mock_tiger_service.start.assert_called_once() # Verify background tasks were started assert len(server.background_tasks) > 0 # Get health status status = server.get_health_status() assert status["server"]["started"] is True # Test shutdown signal server.shutdown_event.set() await server.wait_for_shutdown() assert server.shutdown_event.is_set() finally: # Stop server if server.is_started: await server.stop() # Verify cleanup was performed assert server.is_started is False @pytest.mark.integration @pytest.mark.asyncio async def test_server_error_recovery(self, mock_config): """Test server error recovery during initialization.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager # Create server server = TigerMCPServer() # Mock database initialization failure with patch( "database.get_db_manager", side_effect=RuntimeError("Database connection failed"), ): with pytest.raises(RuntimeError, match="Database connection failed"): await server.initialize() # Verify server is not started assert server.is_started is False @pytest.mark.integration @pytest.mark.slow @pytest.mark.asyncio async def test_server_background_task_management( self, mock_config, mock_account_manager, mock_process_manager ): """Test background task management over time.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager # Create server with fast intervals for testing server = TigerMCPServer() server.config = mock_config server.config.process.health_check_interval = 0.1 # Fast interval server.config.security.token_refresh_threshold = 0.2 server.account_manager = mock_account_manager server.process_manager = mock_process_manager try: # Start background tasks await server._start_background_tasks() # Let tasks run for a short time await asyncio.sleep(0.5) # Verify tasks are running assert len(server.background_tasks) > 0 assert all(not task.done() for task in server.background_tasks) # Trigger shutdown server.shutdown_event.set() # Wait a bit for tasks to notice shutdown await asyncio.sleep(0.2) # Cleanup tasks for task in server.background_tasks: task.cancel() await asyncio.gather(*server.background_tasks, return_exceptions=True) finally: # Ensure cleanup server.shutdown_event.set() for task in server.background_tasks: if not task.done(): task.cancel() if server.background_tasks: await asyncio.gather( *server.background_tasks, return_exceptions=True ) @pytest.mark.integration @pytest.mark.asyncio async def test_server_concurrent_operations( self, mock_config, mock_account_manager, mock_process_manager ): """Test server handling concurrent operations.""" with patch("mcp_server.server.get_config_manager") as mock_get_config_manager: mock_config_manager = MagicMock() mock_config_manager.load_config.return_value = mock_config mock_get_config_manager.return_value = mock_config_manager server = TigerMCPServer() server.config = mock_config server.account_manager = mock_account_manager server.process_manager = mock_process_manager server._started = True # Simulate concurrent health status requests async def get_health_multiple_times(): tasks = [ asyncio.create_task(asyncio.to_thread(server.get_health_status)) for _ in range(10) ] return await asyncio.gather(*tasks) # Execute concurrent operations results = await get_health_multiple_times() # Verify all requests succeeded assert len(results) == 10 assert all(isinstance(result, dict) for result in results) assert all("server" in result for result in results)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server