Skip to main content
Glama
test_service.py12.1 kB
"""Tests for JobManager service.""" import asyncio import pytest import uuid from mcp_background_job.config import BackgroundJobConfig from mcp_background_job.models import JobStatus, ProcessOutput from mcp_background_job.service import JobManager class TestJobManager: """Test cases for JobManager class.""" def test_init(self): """Test JobManager initialization.""" manager = JobManager() assert manager.config is not None assert len(manager._jobs) == 0 assert len(manager._processes) == 0 def test_init_with_config(self): """Test JobManager initialization with custom config.""" config = BackgroundJobConfig(max_concurrent_jobs=5, max_output_size_bytes=1024) manager = JobManager(config) assert manager.config == config assert manager.config.max_concurrent_jobs == 5 assert manager.config.max_output_size_bytes == 1024 @pytest.mark.asyncio async def test_execute_command_success(self): """Test successful command execution.""" manager = JobManager() job_id = await manager.execute_command("echo 'hello world'") # Verify job_id is a valid UUID uuid.UUID(job_id) # Verify job was created assert job_id in manager._jobs assert job_id in manager._processes job = manager._jobs[job_id] assert job.command == "echo 'hello world'" assert job.status == JobStatus.RUNNING assert job.pid is not None @pytest.mark.asyncio async def test_execute_command_empty(self): """Test execution with empty command.""" manager = JobManager() with pytest.raises(ValueError, match="Command cannot be empty"): await manager.execute_command("") with pytest.raises(ValueError, match="Command cannot be empty"): await manager.execute_command(" ") @pytest.mark.asyncio async def test_execute_command_job_limit(self): """Test job limit enforcement.""" config = BackgroundJobConfig(max_concurrent_jobs=1) manager = JobManager(config) # Start first job job_id1 = await manager.execute_command("sleep 10") assert job_id1 in manager._jobs # Try to start second job - should fail with pytest.raises(RuntimeError, match="Maximum concurrent jobs limit"): await manager.execute_command("echo 'second job'") @pytest.mark.asyncio async def test_get_job_status(self): """Test getting job status.""" manager = JobManager() job_id = await manager.execute_command("echo 'hello'") status = await manager.get_job_status(job_id) assert status in [JobStatus.RUNNING, JobStatus.COMPLETED] @pytest.mark.asyncio async def test_get_job_status_not_found(self): """Test getting status for non-existent job.""" manager = JobManager() fake_id = str(uuid.uuid4()) with pytest.raises(KeyError, match=f"Job {fake_id} not found"): await manager.get_job_status(fake_id) @pytest.mark.asyncio async def test_list_jobs(self): """Test listing jobs.""" manager = JobManager() # Initially empty jobs = await manager.list_jobs() assert len(jobs) == 0 # Add some jobs job_id1 = await manager.execute_command("echo 'job1'") job_id2 = await manager.execute_command("echo 'job2'") jobs = await manager.list_jobs() assert len(jobs) == 2 job_ids = [job.job_id for job in jobs] assert job_id1 in job_ids assert job_id2 in job_ids # Verify job summaries contain expected fields for job in jobs: assert hasattr(job, "job_id") assert hasattr(job, "status") assert hasattr(job, "command") assert hasattr(job, "started") @pytest.mark.asyncio async def test_get_job_output(self): """Test getting job output.""" manager = JobManager() job_id = await manager.execute_command("echo 'hello world'") # Wait a bit for command to complete await asyncio.sleep(0.5) output = await manager.get_job_output(job_id) assert isinstance(output, ProcessOutput) assert "hello world" in output.stdout @pytest.mark.asyncio async def test_get_job_output_not_found(self): """Test getting output for non-existent job.""" manager = JobManager() fake_id = str(uuid.uuid4()) with pytest.raises(KeyError, match=f"Job {fake_id} not found"): await manager.get_job_output(fake_id) @pytest.mark.asyncio async def test_tail_job_output(self): """Test tailing job output.""" manager = JobManager() job_id = await manager.execute_command("echo -e 'line1\\nline2\\nline3'") # Wait for command to complete await asyncio.sleep(0.5) # Tail last 2 lines output = await manager.tail_job_output(job_id, 2) assert isinstance(output, ProcessOutput) # Should contain the last lines lines = output.stdout.split("\n") assert len(lines) <= 3 # 2 lines plus potentially empty line @pytest.mark.asyncio async def test_tail_job_output_invalid_lines(self): """Test tailing with invalid line count.""" manager = JobManager() job_id = await manager.execute_command("echo 'test'") with pytest.raises(ValueError, match="Number of lines must be positive"): await manager.tail_job_output(job_id, 0) with pytest.raises(ValueError, match="Number of lines must be positive"): await manager.tail_job_output(job_id, -1) @pytest.mark.asyncio async def test_kill_job(self): """Test killing a job.""" manager = JobManager() job_id = await manager.execute_command("sleep 10") # Kill the job result = await manager.kill_job(job_id) assert result == "killed" # Verify job status changed status = await manager.get_job_status(job_id) assert status == JobStatus.KILLED @pytest.mark.asyncio async def test_kill_job_not_found(self): """Test killing non-existent job.""" manager = JobManager() fake_id = str(uuid.uuid4()) result = await manager.kill_job(fake_id) assert result == "not_found" @pytest.mark.asyncio async def test_kill_job_already_terminated(self): """Test killing already terminated job.""" manager = JobManager() job_id = await manager.execute_command("echo 'quick job'") # Wait for job to complete await asyncio.sleep(0.5) # Try to kill completed job result = await manager.kill_job(job_id) assert result == "already_terminated" @pytest.mark.asyncio async def test_interact_with_job(self): """Test interacting with a job.""" manager = JobManager() # Use cat command which reads from stdin job_id = await manager.execute_command("cat") # Send input to the job output = await manager.interact_with_job(job_id, "hello") assert isinstance(output, ProcessOutput) # Clean up await manager.kill_job(job_id) @pytest.mark.asyncio async def test_interact_with_job_not_found(self): """Test interacting with non-existent job.""" manager = JobManager() fake_id = str(uuid.uuid4()) with pytest.raises(KeyError, match=f"Job {fake_id} not found"): await manager.interact_with_job(fake_id, "input") @pytest.mark.asyncio async def test_interact_with_job_not_running(self): """Test interacting with non-running job.""" manager = JobManager() job_id = await manager.execute_command("echo 'done'") # Wait for job to complete await asyncio.sleep(0.5) with pytest.raises(RuntimeError, match="is not running"): await manager.interact_with_job(job_id, "input") @pytest.mark.asyncio async def test_get_job(self): """Test getting complete job information.""" manager = JobManager() job_id = await manager.execute_command("echo 'test'") job = await manager.get_job(job_id) assert job.job_id == job_id assert job.command == "echo 'test'" assert job.status in [JobStatus.RUNNING, JobStatus.COMPLETED] assert job.started is not None assert job.pid is not None @pytest.mark.asyncio async def test_get_job_not_found(self): """Test getting non-existent job.""" manager = JobManager() fake_id = str(uuid.uuid4()) with pytest.raises(KeyError, match=f"Job {fake_id} not found"): await manager.get_job(fake_id) def test_get_stats(self): """Test getting job statistics.""" manager = JobManager() # Initially empty stats = manager.get_stats() assert stats["total"] == 0 assert stats["running"] == 0 assert stats["completed"] == 0 assert stats["failed"] == 0 assert stats["killed"] == 0 @pytest.mark.asyncio async def test_get_stats_with_jobs(self): """Test getting statistics with jobs.""" manager = JobManager() # Add a job await manager.execute_command("echo 'test'") stats = manager.get_stats() assert stats["total"] == 1 assert stats["running"] >= 0 # Could be running or completed by now def test_cleanup_completed_jobs(self): """Test cleanup of completed jobs.""" manager = JobManager() # Initially no cleanup needed count = manager.cleanup_completed_jobs() assert count == 0 @pytest.mark.asyncio async def test_cleanup_completed_jobs_with_jobs(self): """Test cleanup with completed jobs.""" manager = JobManager() # Start and complete a job await manager.execute_command("echo 'cleanup test'") await asyncio.sleep(0.5) # Wait for completion # Cleanup should clean up the completed process count = manager.cleanup_completed_jobs() assert count >= 0 # Should clean up at least the completed job @pytest.mark.asyncio async def test_shutdown(self): """Test graceful shutdown.""" manager = JobManager() # Start a long-running job job_id = await manager.execute_command("sleep 5") # Shutdown should kill running jobs await manager.shutdown() # Job should be killed status = await manager.get_job_status(job_id) assert status == JobStatus.KILLED class TestJobManagerEdgeCases: """Test edge cases and error conditions.""" @pytest.mark.asyncio async def test_update_job_status_missing_process(self): """Test status update when process wrapper is missing.""" manager = JobManager() job_id = await manager.execute_command("echo 'test'") # Remove process wrapper to simulate edge case del manager._processes[job_id] # Should not crash when updating status await manager._update_job_status(job_id) # Job should be marked as failed job = manager._jobs[job_id] assert job.status == JobStatus.FAILED @pytest.mark.asyncio async def test_update_nonexistent_job(self): """Test updating status of non-existent job.""" manager = JobManager() # Should not crash await manager._update_job_status("non-existent-id") @pytest.mark.asyncio async def test_execute_command_process_start_failure(self): """Test handling of process start failure.""" manager = JobManager() # Command that will fail to start with pytest.raises(Exception): await manager.execute_command("/nonexistent/command") # Should not leave orphaned entries assert len(manager._jobs) == 0 assert len(manager._processes) == 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server