Skip to main content
Glama
test_server_integration.py8.49 kB
"""Integration tests for MCP server.""" import asyncio import pytest import uuid from mcp_background_job.config import BackgroundJobConfig from mcp_background_job.models import JobStatus from mcp_background_job.server import get_job_manager from mcp_background_job.service import JobManager class TestServerIntegration: """Integration tests for the MCP server functionality.""" @pytest.fixture def job_manager(self): """Create a fresh job manager for each test.""" config = BackgroundJobConfig(max_concurrent_jobs=5) return JobManager(config) @pytest.mark.asyncio async def test_job_manager_singleton(self): """Test that get_job_manager returns a singleton instance.""" manager1 = get_job_manager() manager2 = get_job_manager() assert manager1 is manager2 assert isinstance(manager1, JobManager) @pytest.mark.asyncio async def test_complete_job_workflow(self, job_manager): """Test complete job workflow: execute -> status -> output -> kill.""" # Execute a command job_id = await job_manager.execute_command("echo 'Hello World'") assert uuid.UUID(job_id) # Should be valid UUID # Check initial status status = await job_manager.get_job_status(job_id) assert status in [JobStatus.RUNNING, JobStatus.COMPLETED] # Wait a bit for completion await asyncio.sleep(0.5) # Check final status final_status = await job_manager.get_job_status(job_id) assert final_status == JobStatus.COMPLETED # Get output output = await job_manager.get_job_output(job_id) assert "Hello World" in output.stdout # Try to kill completed job kill_result = await job_manager.kill_job(job_id) assert kill_result == "already_terminated" @pytest.mark.asyncio async def test_interactive_job_workflow(self, job_manager): """Test interactive job workflow: execute -> interact -> kill.""" # Start an interactive command job_id = await job_manager.execute_command("cat") # Send input to the job result = await job_manager.interact_with_job(job_id, "hello") assert isinstance(result.stdout, str) # Kill the job kill_result = await job_manager.kill_job(job_id) assert kill_result == "killed" # Verify job is killed status = await job_manager.get_job_status(job_id) assert status == JobStatus.KILLED @pytest.mark.asyncio async def test_concurrent_jobs(self, job_manager): """Test handling multiple concurrent jobs.""" # Start multiple jobs job_ids = [] for i in range(3): job_id = await job_manager.execute_command(f"echo 'Job {i}'") job_ids.append(job_id) # Wait for completion await asyncio.sleep(1.0) # Check all jobs jobs = await job_manager.list_jobs() assert len(jobs) >= 3 # Verify job IDs are in the list listed_job_ids = [job.job_id for job in jobs] for job_id in job_ids: assert job_id in listed_job_ids @pytest.mark.asyncio async def test_tail_functionality(self, job_manager): """Test tail functionality with multi-line output.""" # Create a job with multiple lines of output job_id = await job_manager.execute_command( "echo -e 'line1\\nline2\\nline3\\nline4\\nline5'" ) # Wait for completion await asyncio.sleep(0.5) # Tail last 3 lines tail_output = await job_manager.tail_job_output(job_id, 3) lines = [line for line in tail_output.stdout.split("\n") if line.strip()] # Should have at most 3 non-empty lines assert len(lines) <= 3 # Should contain some of the expected lines output_text = tail_output.stdout assert any(f"line{i}" in output_text for i in [3, 4, 5]) @pytest.mark.asyncio async def test_job_limit_enforcement(self, job_manager): """Test that job limits are enforced.""" # Set a low job limit job_manager.config.max_concurrent_jobs = 2 # Start jobs up to the limit job_ids = [] for i in range(2): job_id = await job_manager.execute_command("sleep 2") job_ids.append(job_id) # Try to start one more job - should fail with pytest.raises(RuntimeError, match="Maximum concurrent jobs limit"): await job_manager.execute_command("echo 'should fail'") # Clean up for job_id in job_ids: await job_manager.kill_job(job_id) @pytest.mark.asyncio async def test_error_handling(self, job_manager): """Test error handling for various edge cases.""" # Test with non-existent job ID fake_job_id = str(uuid.uuid4()) with pytest.raises(KeyError): await job_manager.get_job_status(fake_job_id) with pytest.raises(KeyError): await job_manager.get_job_output(fake_job_id) with pytest.raises(KeyError): await job_manager.interact_with_job(fake_job_id, "input") # kill_job should return "not_found" instead of raising result = await job_manager.kill_job(fake_job_id) assert result == "not_found" @pytest.mark.asyncio async def test_job_statistics(self, job_manager): """Test job statistics functionality.""" # Initial stats stats = job_manager.get_stats() initial_total = stats["total"] # Add some jobs await job_manager.execute_command("echo 'job1'") job_id2 = await job_manager.execute_command("sleep 5") # Check updated stats stats = job_manager.get_stats() assert stats["total"] == initial_total + 2 assert stats["running"] >= 1 # At least the sleep job should be running # Clean up await job_manager.kill_job(job_id2) @pytest.mark.asyncio async def test_cleanup_functionality(self, job_manager): """Test cleanup of completed jobs.""" # Start and complete a job job_id = await job_manager.execute_command("echo 'cleanup test'") await asyncio.sleep(0.5) # Wait for completion # Verify job exists jobs = await job_manager.list_jobs() job_ids = [job.job_id for job in jobs] assert job_id in job_ids # Cleanup completed jobs cleanup_count = job_manager.cleanup_completed_jobs() assert cleanup_count >= 0 # Should clean up at least one job # Job record should still exist (cleanup only removes process wrappers) jobs_after = await job_manager.list_jobs() job_ids_after = [job.job_id for job in jobs_after] assert job_id in job_ids_after class TestServerErrorScenarios: """Test error scenarios and edge cases.""" @pytest.fixture def job_manager(self): """Create a fresh job manager for each test.""" return JobManager() @pytest.mark.asyncio async def test_invalid_commands(self, job_manager): """Test handling of invalid commands.""" # Empty command with pytest.raises(ValueError, match="Command cannot be empty"): await job_manager.execute_command("") # Whitespace-only command with pytest.raises(ValueError, match="Command cannot be empty"): await job_manager.execute_command(" ") @pytest.mark.asyncio async def test_interact_with_completed_job(self, job_manager): """Test interaction with a completed job.""" # Start and complete a job job_id = await job_manager.execute_command("echo 'done'") await asyncio.sleep(0.5) # Wait for completion # Try to interact with completed job with pytest.raises(RuntimeError, match="not running"): await job_manager.interact_with_job(job_id, "input") @pytest.mark.asyncio async def test_invalid_tail_parameters(self, job_manager): """Test tail with invalid parameters.""" job_id = await job_manager.execute_command("echo 'test'") # Invalid line counts with pytest.raises(ValueError, match="Number of lines must be positive"): await job_manager.tail_job_output(job_id, 0) with pytest.raises(ValueError, match="Number of lines must be positive"): await job_manager.tail_job_output(job_id, -1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server